# -*- coding: utf-8 -*- import time from amplify.agent.collectors.abstract import AbstractCollector from amplify.agent.common.context import context from amplify.agent.pipelines.abstract import Pipeline from amplify.agent.objects.nginx.log.access import NginxAccessLogParser import copy __author__ = "Mike Belov" __copyright__ = "Copyright (C) Nginx, Inc. All rights reserved." __license__ = "" __maintainer__ = "Mike Belov" __email__ = "dedm@nginx.com" class NginxAccessLogsCollector(AbstractCollector): short_name = 'nginx_alog' counters = { 'nginx.http.method.head': 'request_method', 'nginx.http.method.get': 'request_method', 'nginx.http.method.post': 'request_method', 'nginx.http.method.put': 'request_method', 'nginx.http.method.delete': 'request_method', 'nginx.http.method.options': 'request_method', 'nginx.http.method.other': 'request_method', 'nginx.http.status.1xx': 'status', 'nginx.http.status.2xx': 'status', 'nginx.http.status.3xx': 'status', 'nginx.http.status.4xx': 'status', 'nginx.http.status.403': 'status', 'nginx.http.status.404': 'status', 'nginx.http.status.5xx': 'status', 'nginx.http.status.500': 'status', 'nginx.http.status.502': 'status', 'nginx.http.status.503': 'status', 'nginx.http.status.504': 'status', 'nginx.http.status.discarded': 'status', 'nginx.http.v0_9': 'server_protocol', 'nginx.http.v1_0': 'server_protocol', 'nginx.http.v1_1': 'server_protocol', 'nginx.http.v2': 'server_protocol', 'nginx.http.request.body_bytes_sent': 'body_bytes_sent', 'nginx.http.request.bytes_sent': 'bytes_sent', 'nginx.upstream.status.1xx': 'upstream_status', 'nginx.upstream.status.2xx': 'upstream_status', 'nginx.upstream.status.3xx': 'upstream_status', 'nginx.upstream.status.4xx': 'upstream_status', 'nginx.upstream.status.5xx': 'upstream_status', 'nginx.cache.bypass': 'upstream_cache_status', 'nginx.cache.expired': 'upstream_cache_status', 'nginx.cache.hit': 'upstream_cache_status', 'nginx.cache.miss': 'upstream_cache_status', 'nginx.cache.revalidated': 'upstream_cache_status', 'nginx.cache.stale': 'upstream_cache_status', 'nginx.cache.updating': 'upstream_cache_status', 'nginx.upstream.next.count': None, 'nginx.upstream.request.count': None } valid_http_methods = ( 'head', 'get', 'post', 'put', 'delete', 'options' ) valid_cache_statuses = ( 'bypass', 'expired', 'hit', 'miss', 'revalidated', 'stale', 'updating', ) def __init__(self, log_format=None, tail=None, **kwargs): super(NginxAccessLogsCollector, self).__init__(**kwargs) self.parser = NginxAccessLogParser(log_format) self.num_of_lines_in_log_format = self.parser.raw_format.count('\n')+1 self.tail = tail # syslog tails names are ":" self.name = tail.name.split(':')[-1] if isinstance(tail, Pipeline) \ else None self.filters = [] # skip empty filters and filters for other log file for log_filter in self.object.filters: if log_filter.empty: continue if not log_filter.matchfile(self.name): continue self.filters.append(log_filter) self.register( self.http_method, self.http_status, self.http_version, self.request_length, self.body_bytes_sent, self.bytes_sent, self.gzip_ration, self.request_time, self.upstreams, ) def init_counters(self, counters=None): for counter, key in self.counters.items(): # If keys are in the parser format (access log) or not defined (error log) if key in self.parser.keys or key is None: self.object.statsd.incr(counter, value=0) # init counters for custom filters for counter in set(f.metric for f in self.filters): if counter in self.counters: self.count_custom_filter(self.filters, counter, 0, self.object.statsd.incr) def collect(self): self.init_counters() # set all counters to 0 count = 0 multiline_record = [] for line in self.tail: count += 1 # release GIL every 1000 of lines if count % (1000 * self.num_of_lines_in_log_format) == 0: time.sleep(0.001) # handle multiline log formats if self.num_of_lines_in_log_format > 1: multiline_record.append(line) if len(multiline_record) < self.num_of_lines_in_log_format: continue else: line = '\n'.join(multiline_record) multiline_record = [] try: parsed = self.parser.parse(line) except: context.log.debug('could not parse line %r' % line, exc_info=True) parsed = None if not parsed: continue if parsed['malformed']: self.request_malformed() else: # try to match custom filters and collect log metrics with them matched_filters = [filter for filter in self.filters if filter.match(parsed)] super(NginxAccessLogsCollector, self).collect(parsed, matched_filters) tail_name = self.tail.name if isinstance(self.tail, Pipeline) else 'list' context.log.debug('%s processed %s lines from %s' % (self.object.definition_hash, count, tail_name)) def request_malformed(self): """ nginx.http.request.malformed """ self.object.statsd.incr('nginx.http.request.malformed') def http_method(self, data, matched_filters=None): """ nginx.http.method.head nginx.http.method.get nginx.http.method.post nginx.http.method.put nginx.http.method.delete nginx.http.method.options nginx.http.method.other :param data: {} of parsed line :param matched_filters: [] of matched filters """ if 'request_method' in data: method = data['request_method'].lower() method = method if method in self.valid_http_methods else 'other' metric_name = 'nginx.http.method.%s' % method self.object.statsd.incr(metric_name) if matched_filters: self.count_custom_filter(matched_filters, metric_name, 1, self.object.statsd.incr) def http_status(self, data, matched_filters=None): """ nginx.http.status.1xx nginx.http.status.2xx nginx.http.status.3xx nginx.http.status.4xx nginx.http.status.403 nginx.http.status.404 nginx.http.status.5xx nginx.http.status.500 nginx.http.status.502 nginx.http.status.503 nginx.http.status.504 nginx.http.status.discarded :param data: {} of parsed line :param matched_filters: [] of matched filters """ if 'status' in data: metrics_to_populate = [] http_status = data['status'] # add separate metrics for specific 4xx and 5xx codes if http_status.startswith('4'): if http_status in ('403', '404'): metrics_to_populate.append('nginx.http.status.%s' % http_status) elif http_status.startswith('5'): if http_status in ('500', '502', '503', '504'): metrics_to_populate.append('nginx.http.status.%s' % http_status) metrics_to_populate.append('nginx.http.status.%sxx' % http_status[0]) for metric_name in metrics_to_populate: self.object.statsd.incr(metric_name) if matched_filters: self.count_custom_filter(matched_filters, metric_name, 1, self.object.statsd.incr) if data['status'] == '499': metric_name = 'nginx.http.status.discarded' self.object.statsd.incr(metric_name) if matched_filters: self.count_custom_filter(matched_filters, metric_name, 1, self.object.statsd.incr) def http_version(self, data, matched_filters=None): """ nginx.http.v0_9 nginx.http.v1_0 nginx.http.v1_1 nginx.http.v2 :param data: {} of parsed line :param matched_filters: [] of matched filters """ if 'server_protocol' in data: proto = data['server_protocol'] if not proto.startswith('HTTP'): return version = proto.split('/')[-1] # Ordered roughly by expected popularity to reduce number of calls to `startswith` if version.startswith('1.1'): suffix = '1_1' elif version.startswith('2.0'): suffix = '2' elif version.startswith('1.0'): suffix = '1_0' elif version.startswith('0.9'): suffix = '0_9' else: suffix = version.replace('.', '_') metric_name = 'nginx.http.v%s' % suffix self.object.statsd.incr(metric_name) if matched_filters: self.count_custom_filter(matched_filters, metric_name, 1, self.object.statsd.incr) def request_length(self, data, matched_filters=None): """ nginx.http.request.length :param data: {} of parsed line :param matched_filters: [] of matched filters """ if 'request_length' in data: metric_name, value = 'nginx.http.request.length', data['request_length'] self.object.statsd.average(metric_name, value) if matched_filters: self.count_custom_filter(matched_filters, metric_name, value, self.object.statsd.average) def body_bytes_sent(self, data, matched_filters=None): """ nginx.http.request.body_bytes_sent :param data: {} of parsed line :param matched_filters: [] of matched filters """ if 'body_bytes_sent' in data: metric_name, value = 'nginx.http.request.body_bytes_sent', data['body_bytes_sent'] self.object.statsd.incr(metric_name, value) if matched_filters: self.count_custom_filter(matched_filters, metric_name, value, self.object.statsd.incr) def bytes_sent(self, data, matched_filters=None): """ nginx.http.request.bytes_sent :param data: {} of parsed line :param matched_filters: [] of matched filters """ if 'bytes_sent' in data: metric_name, value = 'nginx.http.request.bytes_sent', data['bytes_sent'] self.object.statsd.incr(metric_name, value) if matched_filters: self.count_custom_filter(matched_filters, metric_name, value, self.object.statsd.incr) def gzip_ration(self, data, matched_filters=None): """ nginx.http.gzip.ratio :param data: {} of parsed line :param matched_filters: [] of matched filters """ if 'gzip_ratio' in data: metric_name, value = 'nginx.http.gzip.ratio', data['gzip_ratio'] self.object.statsd.average(metric_name, value) if matched_filters: self.count_custom_filter(matched_filters, metric_name, value, self.object.statsd.average) def request_time(self, data, matched_filters=None): """ nginx.http.request.time nginx.http.request.time.median nginx.http.request.time.max nginx.http.request.time.pctl95 nginx.http.request.time.count :param data: {} of parsed line :param matched_filters: [] of matched filters """ if 'request_time' in data: metric_name, value = 'nginx.http.request.time', sum(data['request_time']) self.object.statsd.timer(metric_name, value) if matched_filters: self.count_custom_filter(self.create_parent_filters(matched_filters, parent_metric=metric_name), metric_name, value, self.object.statsd.timer) def upstreams(self, data, matched_filters=None): """ nginx.cache.bypass nginx.cache.expired nginx.cache.hit nginx.cache.miss nginx.cache.revalidated nginx.cache.stale nginx.cache.updating nginx.upstream.request.count nginx.upstream.next.count nginx.upstream.connect.time nginx.upstream.connect.time.median nginx.upstream.connect.time.max nginx.upstream.connect.time.pctl95 nginx.upstream.connect.time.count nginx.upstream.header.time nginx.upstream.header.time.median nginx.upstream.header.time.max nginx.upstream.header.time.pctl95 nginx.upstream.header.time.count nginx.upstream.response.time nginx.upstream.response.time.median nginx.upstream.response.time.max nginx.upstream.response.time.pctl95 nginx.upstream.response.time.count nginx.upstream.status.1xx nginx.upstream.status.2xx nginx.upstream.status.3xx nginx.upstream.status.4xx nginx.upstream.status.5xx nginx.upstream.response.length :param data: {} of parsed line :param matched_filters: [] of matched filters """ if not any(key.startswith('upstream') and data[key] not in ('-', '') for key in data): return # counters upstream_response = False if 'upstream_status' in data: for status in data['upstream_status']: # upstream_status is parsed as a list if status.isdigit(): suffix = '%sxx' % status[0] metric_name = 'nginx.upstream.status.%s' % suffix upstream_response = True if suffix in ('2xx', '3xx') else False # Set flag for upstream length processing self.object.statsd.incr(metric_name) if matched_filters: self.count_custom_filter(matched_filters, metric_name, 1, self.object.statsd.incr) if upstream_response and 'upstream_response_length' in data: metric_name, value = 'nginx.upstream.response.length', data['upstream_response_length'] self.object.statsd.average(metric_name, value) if matched_filters: self.count_custom_filter(matched_filters, metric_name, value, self.object.statsd.average) # gauges upstream_switches = None for metric_name, key_name in { 'nginx.upstream.connect.time': 'upstream_connect_time', 'nginx.upstream.response.time': 'upstream_response_time', 'nginx.upstream.header.time': 'upstream_header_time' }.items(): if key_name in data: values = data[key_name] # set upstream switches one time if len(values) > 1 and upstream_switches is None: upstream_switches = len(values) - 1 # store all values value = sum(values) self.object.statsd.timer(metric_name, value) if matched_filters: self.count_custom_filter(self.create_parent_filters(matched_filters, parent_metric=metric_name), metric_name, value, self.object.statsd.timer) # log upstream switches metric_name, value = 'nginx.upstream.next.count', 0 if upstream_switches is None else upstream_switches self.object.statsd.incr(metric_name, value) if matched_filters: self.count_custom_filter(matched_filters, metric_name, value, self.object.statsd.incr) # cache if 'upstream_cache_status' in data: cache_status = data['upstream_cache_status'] cache_status_lower = cache_status.lower() if cache_status_lower in self.valid_cache_statuses: metric_name = 'nginx.cache.%s' % cache_status_lower self.object.statsd.incr(metric_name) if matched_filters: self.count_custom_filter(matched_filters, metric_name, 1, self.object.statsd.incr) # log total upstream requests metric_name = 'nginx.upstream.request.count' self.object.statsd.incr(metric_name) if matched_filters: self.count_custom_filter(matched_filters, metric_name, 1, self.object.statsd.incr) @staticmethod def create_parent_filters(original_filters, parent_metric): """ median, max, pctl95, and count are created in statsd.flush(). So if a filter on nginx.upstream.response.time.median is created, the filter metric should be truncated to nginx.upstream.response.time :param original_filters: :param truncated_metric: :return: """ parent_filters = [] for original_filter in original_filters: if parent_metric not in original_filter.metric: continue parent_filter = copy.deepcopy(original_filter) parent_filter.metric = parent_metric parent_filters.append(parent_filter) return parent_filters @staticmethod def count_custom_filter(matched_filters, metric_name, value, method): """ Collect custom metric :param matched_filters: [] of matched filters :param metric_name: str metric name :param value: int/float value :param method: function to call :return: """ for log_filter in matched_filters: if log_filter.metric == metric_name: full_metric_name = '%s||%s' % (log_filter.metric, log_filter.filter_rule_id) method(full_metric_name, value)