# -*- coding: utf-8 -*- import re import time import psutil from gevent import GreenletExit from amplify.agent.common.util.plus import traverse_plus_api from amplify.agent.collectors.abstract import AbstractMetricsCollector from amplify.agent.collectors.plus.util.api import http_cache as api_http_cache from amplify.agent.collectors.plus.util.api import http_server_zone as api_http_server_zone from amplify.agent.collectors.plus.util.api import http_upstream as api_http_upstream from amplify.agent.collectors.plus.util.api import slab as api_slab from amplify.agent.collectors.plus.util.api import stream_server_zone as api_stream_server_zone from amplify.agent.collectors.plus.util.api import stream_upstream as api_stream_upstream from amplify.agent.collectors.plus.util.status import cache as status_cache from amplify.agent.collectors.plus.util.status import status_zone as status_http_server_zone from amplify.agent.collectors.plus.util.status import upstream as status_http_upstream from amplify.agent.collectors.plus.util.status import slab as status_slab from amplify.agent.collectors.plus.util.status import stream as status_stream_server_zone from amplify.agent.collectors.plus.util.status import stream_upstream as status_stream_upstream from amplify.agent.common.context import context from amplify.agent.common.errors import AmplifyParseException from amplify.agent.common.util.ps import Process from amplify.agent.data.eventd import WARNING __author__ = "Mike Belov" __copyright__ = "Copyright (C) Nginx, Inc. All rights reserved." __license__ = "" __maintainer__ = "Mike Belov" __email__ = "dedm@nginx.com" STUB_RE = re.compile(r'^Active connections: (?P\d+)\s+[\w ]+\n' r'\s+(?P\d+)' r'\s+(?P\d+)' r'\s+(?P\d+)' r'\s+Reading:\s+(?P\d+)' r'\s+Writing:\s+(?P\d+)' r'\s+Waiting:\s+(?P\d+)') class NginxMetricsCollector(AbstractMetricsCollector): short_name = 'nginx_metrics' status_metric_key = 'nginx.status' def __init__(self, **kwargs): super(NginxMetricsCollector, self).__init__(**kwargs) self.processes = [Process(pid) for pid in self.object.workers] self.zombies = set() self.register( self.workers_count, self.memory_info, self.workers_fds_count, self.workers_cpu, self.global_metrics, self.reloads_and_restarts_count, ) if not self.in_container: self.register( self.workers_rlimit_nofile, self.workers_io ) def handle_exception(self, method, exception): if isinstance(exception, psutil.NoSuchProcess): # Log exception context.log.warning( 'failed to collect metrics %s due to %s, object restart needed (PID: %s)' % (method.__name__, exception.__class__.__name__, exception.pid) ) # since the PID no longer exists, mark the object as needing restart for safety self.object.need_restart = True else: # Fire event warning. self.object.eventd.event( level=WARNING, message="can't obtain worker process metrics (maybe permissions?)", onetime=True ) super(NginxMetricsCollector, self).handle_exception(method, exception) def reloads_and_restarts_count(self): self.object.statsd.incr('nginx.master.reloads', self.object.reloads) self.object.reloads = 0 def workers_count(self): """nginx.workers.count""" self.object.statsd.gauge('nginx.workers.count', len(self.object.workers)) def handle_zombie(self, pid): """ removes pid from workers list :param pid: zombie pid """ context.log.warning('zombie process %s found' % pid) self.zombies.add(pid) def memory_info(self): """ memory info nginx.workers.mem.rss nginx.workers.mem.vms nginx.workers.mem.rss_pct """ rss, vms, pct = 0, 0, 0.0 for p in self.processes: if p.pid in self.zombies: continue try: mem_info = p.memory_info() rss += mem_info.rss vms += mem_info.vms pct += p.memory_percent() except psutil.ZombieProcess: self.handle_zombie(p.pid) self.object.statsd.gauge('nginx.workers.mem.rss', rss) self.object.statsd.gauge('nginx.workers.mem.vms', vms) self.object.statsd.gauge('nginx.workers.mem.rss_pct', pct) def workers_fds_count(self): """nginx.workers.fds_count""" fds = 0 for p in self.processes: if p.pid in self.zombies: continue try: fds += p.num_fds() except psutil.ZombieProcess: self.handle_zombie(p.pid) self.object.statsd.incr('nginx.workers.fds_count', fds) def workers_cpu(self): """ cpu nginx.workers.cpu.system nginx.workers.cpu.user """ worker_user, worker_sys = 0.0, 0.0 for p in self.processes: if p.pid in self.zombies: continue try: u, s = p.cpu_percent() worker_user += u worker_sys += s except psutil.ZombieProcess: self.handle_zombie(p.pid) self.object.statsd.gauge('nginx.workers.cpu.total', worker_user + worker_sys) self.object.statsd.gauge('nginx.workers.cpu.user', worker_user) self.object.statsd.gauge('nginx.workers.cpu.system', worker_sys) def global_metrics(self): """ check if found api or extended status, collect "global" metrics from it don't look for stub_status if there's no extended status or N+ API easily accessible, proceed with stub_status """ if self.object.api_enabled and self.object.api_internal_url: self.plus_api() elif self.object.plus_status_enabled and self.object.plus_status_internal_url \ and self.object.status_directive_supported: self.plus_status() elif self.object.stub_status_enabled and self.object.stub_status_url: self.stub_status() else: return def stub_status(self): """ stub status metrics nginx.http.conn.current = ss.active nginx.http.conn.active = ss.active - ss.waiting nginx.http.conn.idle = ss.waiting nginx.http.request.count = ss.requests ## counter nginx.http.request.reading = ss.reading nginx.http.request.writing = ss.writing nginx.http.conn.dropped = ss.accepts - ss.handled ## counter nginx.http.conn.accepted = ss.accepts ## counter """ stub_body = '' stub = {} stub_time = int(time.time()) # get stub status body try: stub_body = context.http_client.get(self.object.stub_status_url, timeout=1, json=False, log=False) except GreenletExit: # we caught an exit signal in the middle of processing so raise it. raise except: context.log.error('failed to check stub_status url %s' % self.object.stub_status_url) context.log.debug('additional info', exc_info=True) stub_body = None if not stub_body: return # parse body try: gre = STUB_RE.match(stub_body) if not gre: raise AmplifyParseException(message='stub status %s' % stub_body) for field in ('connections', 'accepts', 'handled', 'requests', 'reading', 'writing', 'waiting'): stub[field] = int(gre.group(field)) except: context.log.error('failed to parse stub_status body') raise # store some variables for further use stub['dropped'] = stub['accepts'] - stub['handled'] # gauges self.object.statsd.gauge('nginx.http.conn.current', stub['connections']) self.object.statsd.gauge('nginx.http.conn.active', stub['connections'] - stub['waiting']) self.object.statsd.gauge('nginx.http.conn.idle', stub['waiting']) self.object.statsd.gauge('nginx.http.request.writing', stub['writing']) self.object.statsd.gauge('nginx.http.request.reading', stub['reading']) self.object.statsd.gauge('nginx.http.request.current', stub['reading'] + stub['writing']) # counters counted_vars = { 'nginx.http.request.count': 'requests', 'nginx.http.conn.accepted': 'accepts', 'nginx.http.conn.dropped': 'dropped' } for metric_name, stub_name in counted_vars.items(): stamp, value = stub_time, stub[stub_name] prev_stamp, prev_value = self.previous_counters.get(metric_name, (None, None)) if isinstance(prev_value, (int, float, complex)) and prev_stamp and prev_stamp != stamp: value_delta = value - prev_value self.object.statsd.incr(metric_name, value_delta) self.previous_counters[metric_name] = [stamp, value] def plus_status(self): """ plus status metrics nginx.http.conn.accepted = connections.accepted ## counter nginx.http.conn.dropped = connections.dropped ## counter nginx.http.conn.active = connections.active nginx.http.conn.current = connections.active + connections.idle nginx.http.conn.idle = connections.idle nginx.http.request.count = requests.total ## counter nginx.http.request.current = requests.current plus.http.ssl.handshakes = ssl.handshakes plus.http.ssl.failed = ssl.handshakes_failed plus.http.ssl.reuses = ssl.session_reuses also here we run plus metrics collection """ stamp = int(time.time()) # get plus status body try: status = context.http_client.get(self.object.plus_status_internal_url, timeout=1, log=False) # modify status to move stream data up a level if 'stream' in status: status['streams'] = status['stream'].get('server_zones', {}) status['stream_upstreams'] = status['stream'].get('upstreams', {}) # Add the status payload to plus_cache so it can be parsed by other collectors (plus objects) context.plus_cache.put(self.object.plus_status_internal_url, (status, stamp)) except GreenletExit: raise except: context.log.error('failed to check plus_status url %s' % self.object.plus_status_internal_url) context.log.debug('additional info', exc_info=True) status = None if not status: return connections = status.get('connections', {}) requests = status.get('requests', {}) ssl = status.get('ssl', {}) # gauges self.object.statsd.gauge('nginx.http.conn.active', connections.get('active')) self.object.statsd.gauge('nginx.http.conn.idle', connections.get('idle')) self.object.statsd.gauge('nginx.http.conn.current', connections.get('active') + connections.get('idle')) self.object.statsd.gauge('nginx.http.request.current', requests.get('current')) # counters counted_vars = { 'nginx.http.request.count': requests.get('total'), 'nginx.http.conn.accepted': connections.get('accepted'), 'nginx.http.conn.dropped': connections.get('dropped'), 'plus.http.ssl.handshakes': ssl.get('handshakes'), 'plus.http.ssl.failed': ssl.get('handshakes_failed'), 'plus.http.ssl.reuses': ssl.get('session_reuses') } self.aggregate_counters(counted_vars, stamp=stamp) # aggregate plus metrics # caches caches = status.get('caches', {}) for cache in caches.values(): for method in status_cache.CACHE_COLLECT_INDEX: method(self, cache, stamp) # status zones zones = status.get('server_zones', {}) for zone in zones.values(): for method in status_http_server_zone.STATUS_ZONE_COLLECT_INDEX: method(self, zone, stamp) # upstreams upstreams = status.get('upstreams', {}) for upstream in upstreams.values(): # workaround for supporting old N+ format # http://nginx.org/en/docs/http/ngx_http_status_module.html#compatibility peers = upstream['peers'] if 'peers' in upstream else upstream for peer in peers: for method in status_http_upstream.UPSTREAM_PEER_COLLECT_INDEX: method(self, peer, stamp) for method in status_http_upstream.UPSTREAM_COLLECT_INDEX: method(self, upstream, stamp) # slabs slabs = status.get('slabs', {}) for slab in slabs.values(): for method in status_slab.SLAB_COLLECT_INDEX: method(self, slab, stamp) # streams - server_zones of stream streams = status.get('streams', {}) for stream in streams.values(): for method in status_stream_server_zone.STREAM_COLLECT_INDEX: method(self, stream, stamp) # stream upstreams - upstreams of stream stream_upstreams = status.get('stream_upstreams', {}) for stream_upstream in stream_upstreams.values(): peers = stream_upstream['peers'] if 'peers' in stream_upstream else stream_upstream for peer in peers: for method in status_stream_upstream.STREAM_UPSTREAM_PEER_COLLECT_INDEX: method(self, peer, stamp) for method in status_stream_upstream.STREAM_UPSTREAM_COLLECT_INDEX: method(self, stream_upstream, stamp) self.increment_counters() self.finalize_latest() def plus_api(self): """ plus api top-level metrics nginx.http.conn.accepted = connections.accepted ## counter nginx.http.conn.dropped = connections.dropped ## counter nginx.http.conn.active = connections.active nginx.http.conn.current = connections.active + connections.idle nginx.http.conn.idle = connections.idle nginx.http.request.count = requests.total ## counter nginx.http.request.current = requests.current plus.http.ssl.handshakes = ssl.handshakes plus.http.ssl.failed = ssl.handshakes_failed plus.http.ssl.reuses = ssl.session_reuses plus.proc.respawned = processes.respawned also here we run plus metrics collection """ stamp = int(time.time()) try: aggregated_api_payload = traverse_plus_api( location_prefix=self.object.api_internal_url, root_endpoints_to_skip=self.object.api_endpoints_to_skip ) except GreenletExit: raise except: context.log.error('failed to check plus_api url %s' % self.object.api_internal_url) context.log.debug('additional info', exc_info=True) aggregated_api_payload = None if not aggregated_api_payload: return context.plus_cache.put(self.object.api_internal_url, (aggregated_api_payload, stamp)) connections = aggregated_api_payload.get('connections', {}) http = aggregated_api_payload.get('http', {}) requests = http.get('requests', {}) ssl = aggregated_api_payload.get('ssl', {}) processes = aggregated_api_payload.get('processes', {}) stream = aggregated_api_payload.get('stream', {}) # gauges self.object.statsd.gauge('nginx.http.conn.active', connections.get('active')) self.object.statsd.gauge('nginx.http.conn.idle', connections.get('idle')) self.object.statsd.gauge('nginx.http.conn.current', connections.get('active') + connections.get('idle')) self.object.statsd.gauge('nginx.http.request.current', requests.get('current')) # counters counted_vars = { 'nginx.http.request.count': requests.get('total'), 'nginx.http.conn.accepted': connections.get('accepted'), 'nginx.http.conn.dropped': connections.get('dropped'), 'plus.http.ssl.handshakes': ssl.get('handshakes'), 'plus.http.ssl.failed': ssl.get('handshakes_failed'), 'plus.http.ssl.reuses': ssl.get('session_reuses'), 'plus.proc.respawned' : processes.get('respawned') } self.aggregate_counters(counted_vars, stamp=stamp) caches = http.get('caches', {}) for cache in caches.values(): for method in api_http_cache.CACHE_COLLECT_INDEX: method(self, cache, stamp) http_server_zones = http.get('server_zones', {}) for server_zone in http_server_zones.values(): for method in api_http_server_zone.STATUS_ZONE_COLLECT_INDEX: method(self, server_zone, stamp) http_upstreams = http.get('upstreams', {}) for upstream in http_upstreams.values(): for peer in upstream.get('peers', []): for method in api_http_upstream.UPSTREAM_PEER_COLLECT_INDEX: method(self, peer, stamp) for method in api_http_upstream.UPSTREAM_COLLECT_INDEX: method(self, upstream, stamp) slabs = aggregated_api_payload.get('slabs', {}) for slab in slabs.values(): for method in api_slab.SLAB_COLLECT_INDEX: method(self, slab, stamp) stream_server_zones = stream.get('server_zones', {}) for server_zone in stream_server_zones.values(): for method in api_stream_server_zone.STREAM_COLLECT_INDEX: method(self, server_zone, stamp) stream_upstreams = stream.get('upstreams', {}) for upstream in stream_upstreams.values(): for peer in upstream.get('peers', []): for method in api_stream_upstream.STREAM_UPSTREAM_PEER_COLLECT_INDEX: method(self, peer, stamp) for method in api_stream_upstream.STREAM_UPSTREAM_COLLECT_INDEX: method(self, upstream, stamp) self.increment_counters() self.finalize_latest() def workers_rlimit_nofile(self): """ nginx.workers.rlimit_nofile sum for all hard limits (second value of rlimit) """ rlimit = 0 for p in self.processes: if p.pid in self.zombies: continue try: rlimit += p.rlimit_nofile() except psutil.ZombieProcess: self.handle_zombie(p.pid) self.object.statsd.gauge('nginx.workers.rlimit_nofile', rlimit) def workers_io(self): """ io nginx.workers.io.kbs_r nginx.workers.io.kbs_w """ # collect raw data read, write = 0, 0 for p in self.processes: if p.pid in self.zombies: continue try: io = p.io_counters() read += io.read_bytes write += io.write_bytes except psutil.ZombieProcess: self.handle_zombie(p.pid) current_stamp = int(time.time()) # kilobytes! read /= 1024 write /= 1024 # get deltas and store metrics for metric_name, value in {'nginx.workers.io.kbs_r': read, 'nginx.workers.io.kbs_w': write}.items(): prev_stamp, prev_value = self.previous_counters.get(metric_name, (None, None)) if isinstance(prev_value, (int, float, complex)) and prev_stamp and prev_stamp != current_stamp: value_delta = value - prev_value self.object.statsd.incr(metric_name, value_delta) self.previous_counters[metric_name] = (current_stamp, value) class GenericLinuxNginxMetricsCollector(NginxMetricsCollector): pass class DebianNginxMetricsCollector(NginxMetricsCollector): pass class CentosNginxMetricsCollector(NginxMetricsCollector): pass class GentooNginxMetricsCollector(NginxMetricsCollector): pass class FreebsdNginxMetricsCollector(NginxMetricsCollector): def workers_fds_count(self): """ This doesn't work on FreeBSD """ pass