nginx-amplify-agent/amplify/agent/objects/nginx/config/config.py

511 lines
20 KiB
Python

# -*- coding: utf-8 -*-
import hashlib
import json
import os
import time
import rstr
from crossplane.lexer import _iterescape
from amplify.agent.common.context import context
from amplify.agent.common.util import subp
from amplify.agent.common.util.glib import glib
from amplify.agent.common.util.ssl import ssl_analysis
from amplify.agent.objects.nginx.binary import nginx_v
from amplify.agent.objects.nginx.config.parser import NginxConfigParser, get_filesystem_info
__author__ = "Mike Belov"
__copyright__ = "Copyright (C) Nginx, Inc. All rights reserved."
__license__ = ""
__maintainer__ = "Mike Belov"
__email__ = "dedm@nginx.com"
ERROR_LOG_LEVELS = (
'debug',
'info',
'notice',
'warn',
'error',
'crit',
'alert',
'emerg'
)
def _enquote(arg):
if not arg or any(char.isspace() for char in _iterescape(arg)):
return repr(arg).decode('string_escape')
else:
return arg
class NginxConfig(object):
"""
Nginx config representation **for a running NGINX instance**
Main tasks:
- find all log formats
- find all access logs
- find all error logs
- find stub_status url
"""
def __init__(self, filename, binary=None, prefix=None):
self.filename = filename
self.binary = binary
self.prefix = prefix
self.log_formats = {}
self.access_logs = {}
self.error_logs = {}
self.test_errors = []
self.tree = {}
self.files = {}
self.directories = {}
self.directory_map = {}
self.subtree = []
self.ssl_certificates = {}
self.parser_ssl_certificates = []
self.parser_errors = []
self.stub_status_urls = []
self.plus_status_external_urls = []
self.plus_status_internal_urls = []
self.api_external_urls = []
self.api_internal_urls = []
self.parser = None
self.wait_until = 0
def _setup_parser(self):
self.parser = NginxConfigParser(filename=self.filename)
def _teardown_parser(self):
self.parser = None
def full_parse(self, include_ssl_certs=True):
context.log.debug('parsing full tree of %s' % self.filename)
# parse raw data
try:
self._setup_parser()
self.parser.parse(include_ssl_certs=include_ssl_certs)
self._handle_parse()
except Exception as e:
context.log.error('failed to parse config at %s (due to %s)' % (self.filename, e.__class__.__name__))
context.log.debug('additional info:', exc_info=True)
self._setup_parser() # Re-init parser to discard partial data (if any)
# Post-handling
# try to add logs from nginx -V configure options
self.add_configured_variable_logs()
# try to locate and use default logs (PREFIX/logs/*)
self.add_default_logs()
# Go through log files and apply exclude rules (log files are added during .__colect_data()
self._exclude_logs()
# try to read from each log file to check if it can be parsed
self._check_logs()
# dump access log files, access log formats, and error log files to the debug log
context.log.debug(
'parsed log formats, access logs, and error logs:' +
'\nlog formats: ' + json.dumps(self.log_formats, indent=4, sort_keys=True) +
'\naccess logs: ' + json.dumps(self.access_logs, indent=4, sort_keys=True) +
'\nerror logs: ' + json.dumps(self.error_logs, indent=4, sort_keys=True)
)
def _handle_parse(self):
self.tree = self.parser.tree
self.files = self.parser.files
self.directories = self.parser.directories
self.directory_map = self.parser.directory_map
self.subtree = self.parser.simplify()
self.ssl_certificates = {} # gets populated in run_ssl_analysis()
self.parser_ssl_certificates = self.parser.ssl_certificates
self.parser_errors = self.parser.errors
# now that we have all the things we need from parser, we can tear it down
self._teardown_parser()
# clear url values in the config that can/will be used to find metrics
# do this now because self._collect_data() will repopulate the lists
self.stub_status_urls = []
self.plus_status_external_urls = []
self.plus_status_internal_urls = []
self.api_external_urls = []
self.api_internal_urls = []
# go through and collect all logical data
self._collect_data(self.subtree)
def collect_structure(self, include_ssl_certs=False):
"""
Goes through all files (light-parsed includes) and collects their mtime
:param include_ssl_certs: bool - include ssl certs or not
:return: {} - dict of files
"""
# if self.parser is None, set it up
if self.parser is None:
self._setup_parser()
files, directories = self.parser.get_structure(include_ssl_certs=include_ssl_certs)
context.log.debug('found %s files for %s' % (len(files.keys()), self.filename))
context.log.debug('found %s directories for %s' % (len(directories.keys()), self.filename))
# always teardown the parser
self._teardown_parser()
return files, directories
def total_size(self):
"""
Returns the total size of a config tree
:return: int size in bytes
"""
return sum(data['size'] for data in self.files.values())
def _collect_data(self, block, ctx=None):
"""
Searches needed data in config's tree
:param block: list of statement dicts to parse
:param ctx: dict with context
"""
ctx = ctx if ctx is not None else {}
def usable_log_args(args):
is_disabled = not args or args[0] == 'off'
uses_variable = any('$' in arg for arg in args if not arg.startswith('if='))
return not is_disabled and not uses_variable
for stmt in block:
directive = stmt['directive']
args = stmt['args']
if directive == 'error_log' and usable_log_args(args):
path = args[0].replace('"', '').replace("'", '')
# if not syslog, assume it is a file...if not starts with '/' assume relative path
if not path.startswith('syslog') and not path.startswith('/'):
path = os.path.join(self.prefix, path)
if path not in self.error_logs:
if len(args) > 1 and args[1] in ERROR_LOG_LEVELS:
self.error_logs[path] = {'log_level': args[1]}
else:
self.error_logs[path] = {'log_level': 'error'} # nginx default log level
elif directive == 'access_log' and usable_log_args(args):
path = args[0].replace('"', '').replace("'", '')
# if not syslog, assume it is a file...if not starts with '/' assume relative path
if not path.startswith('syslog') and not path.startswith('/'):
path = os.path.join(self.prefix, path)
format = args[1] if len(args) > 1 else None
self.access_logs[path] = {'log_format': format}
elif directive == 'log_format':
name, strings = args[0], args[1:]
# disregard the (optional) escape parameter
if len(strings) > 1 and strings[0].startswith('escape='):
strings.pop(0)
self.log_formats[name] = ''.join(
x.encode('utf-8').decode('unicode_escape') for x in strings
)
elif directive == 'server' and 'upstream' not in ctx:
listens = []
for inner_stmt in stmt['block']:
if inner_stmt['directive'] == 'listen':
listens.append(inner_stmt['args'][0])
if not listens:
listens += ['80', '8000']
ip_port = []
for listen in listens:
try:
ip_port.append(self._parse_listen(listen))
except:
context.log.error('failed to parse bad ipv6 listen directive: %s' % listen)
context.log.debug('additional info:', exc_info=True)
server_ctx = dict(ctx, ip_port=ip_port)
for inner_stmt in stmt['block']:
if inner_stmt['directive'] == 'server_name':
server_ctx['server_name'] = inner_stmt['args'][0]
break
for inner_stmt in stmt['block']:
if inner_stmt['directive'] == 'listen':
server_ctx['server_schema'] = 'https' if 'ssl' in inner_stmt['args'] else 'http'
break
self._collect_data(stmt['block'], ctx=server_ctx)
elif directive == 'upstream':
upstream = args[0]
upstream_ctx = dict(ctx, upstream=upstream)
self._collect_data(stmt['block'], ctx=upstream_ctx)
elif directive == 'location':
location = ' '.join(map(_enquote, args))
location_ctx = dict(ctx, location=location)
self._collect_data(stmt['block'], ctx=location_ctx)
elif directive == 'stub_status' and 'ip_port' in ctx:
for url in self._status_url(ctx):
if url not in self.stub_status_urls:
self.stub_status_urls.append(url)
elif (directive == 'status' or self._is_plus_dashboard(stmt, ctx)) and 'ip_port' in ctx:
# use different url builders for external and internal urls
for url in self._status_url(ctx, server_preferred=True):
if url not in self.plus_status_external_urls:
self.plus_status_external_urls.append(url)
# for internal (agent) usage local ip address is a better choice,
# because the external url might not be accessible from a host
for url in self._status_url(ctx, server_preferred=False):
if url not in self.plus_status_internal_urls:
self.plus_status_internal_urls.append(url)
elif directive == 'api' and 'ip_port' in ctx:
# use different url builders for external and internal urls
for url in self._status_url(ctx, server_preferred=True):
if url not in self.api_external_urls:
self.api_external_urls.append(url)
# for internal (agent) usage local ip address is a better choice,
# because the external url might not be accessible from a host
for url in self._status_url(ctx, server_preferred=False):
if url not in self.api_internal_urls:
self.api_internal_urls.append(url)
elif 'block' in stmt:
self._collect_data(stmt['block'], ctx=ctx)
@staticmethod
def _is_plus_dashboard(stmt, ctx):
"""
Now that the `status` directive is deprecated this method is used to determine
plus dashboard urls. It does so by checking to see if the config follows the
conventional pattern for including the plus dashboard:
location = /dashboard.html {
root /usr/share/nginx/html;
}
Obviously this is not perfect, but it's the best we can do now that the `status`
directive is gone.
"""
correct_directive = stmt['directive'] == 'root'
correct_arguments = stmt['args'] == ['/usr/share/nginx/html']
correct_location = ctx.get('location', '/').endswith('dashboard.html')
return correct_directive and correct_arguments and correct_location
@staticmethod
def _status_url(ctx, server_preferred=False):
"""
Creates stub/plus status url based on context
:param ctx: {} of current parsing context
:param server_preferred: bool - use server_name instead of listen
:return: [] of urls
"""
location = ctx.get('location', '/')
# remove all modifiers
location_parts = location.split(' ')
final_location_part = location_parts[-1]
# generate a random sting that will fit regex location
if location.startswith('~'):
try:
exact_location = rstr.xeger(final_location_part)
# check that regex location has / and add it
if not exact_location.startswith('/'):
exact_location = '/%s' % exact_location
except:
context.log.debug('bad regex location: %s' % final_location_part)
exact_location = None
else:
exact_location = final_location_part
# if an exact location doesn't have / that's not a working location, we should not use it
if not exact_location.startswith('/'):
context.log.debug('bad exact location: %s' % final_location_part)
exact_location = None
if exact_location:
for ip_port in ctx.get('ip_port'):
address, port = ip_port
if server_preferred and 'server_name' in ctx:
address = ctx['server_name']
schema = 'http'
if 'server_schema' in ctx:
schema = ctx['server_schema']
yield '%s://%s:%s%s' % (schema, address, port, exact_location)
def run_test(self):
"""
Tests the configuration using nginx -t
Saves event info if syntax check was not successful
"""
start_time = time.time()
context.log.info('running %s -t -c %s' % (self.binary, self.filename))
if self.binary:
try:
_, nginx_t_err = subp.call("%s -t -c %s" % (self.binary, self.filename), check=False)
for line in nginx_t_err:
if 'syntax is' in line and 'syntax is ok' not in line:
self.test_errors.append(line)
except Exception as e:
exception_name = e.__class__.__name__
context.log.error('failed to %s -t -c %s due to %s' % (self.binary, self.filename, exception_name))
context.log.debug('additional info:', exc_info=True)
end_time = time.time()
return end_time - start_time
def checksum(self):
"""
Calculates total checksum of all config files, certificates and permissions
:return: str checksum
"""
checksums = []
for file_path, file_data in self.files.items():
checksums.append(hashlib.sha256(open(file_path, 'rb').read()).hexdigest())
checksums.append(file_data['permissions'])
checksums.append(str(file_data['mtime']))
for dir_data in self.directories.values():
checksums.append(dir_data['permissions'])
checksums.append(str(dir_data['mtime']))
for cert in self.ssl_certificates.keys():
checksums.append(hashlib.sha256(open(cert, 'rb').read()).hexdigest())
return hashlib.sha256('.'.join(checksums).encode('utf-8')).hexdigest()
def _parse_listen(self, listen):
"""
Parses listen directive value and return ip:port string, like *:80 and so on
:param listen: str raw listen
:return: str ip:port
"""
if '[' in listen:
# ipv6
parts = list(filter(len, listen.rsplit(']', 1)))
address = '%s]' % parts[0]
port = '80' if len(parts) == 1 else parts[1].split(':')[1]
else:
# ipv4
parts = list(filter(len, listen.rsplit(':', 1)))
if len(parts) == 1 and parts[0].isdigit():
address, port = '*', parts[0]
elif len(parts) == 1:
address, port = parts[0], '80'
else:
address, port = parts
# standardize address
if address in ('*', '0.0.0.0'):
address = '127.0.0.1'
elif address == '[::]':
address = '[::1]'
return address, port
def add_configured_variable_logs(self):
"""
Get logs configured through nginx -V options and try to find access and error logs
This happens only if nginx access and error logs are not configured in nginx.conf
"""
if self.binary is not None and (len(self.access_logs) < 1 or len(self.error_logs) < 1):
try:
v_options = nginx_v(self.binary)
configure = v_options['configure']
# adding access or error logs from options only if they are empty
if len(self.access_logs) < 1:
access_log_path = configure.get('http-log-path')
if os.path.isfile(access_log_path) and access_log_path is not None:
self.access_logs[access_log_path] = {'log_format': None}
if len(self.error_logs) < 1:
error_log_path = configure.get('error-log-path')
if os.path.isfile(error_log_path) and error_log_path is not None:
self.error_logs[error_log_path] = {'log_level': 'error'}
except Exception as e:
exception_name = e.__class__.__name__
context.log.error(
'failed to get configured variables from %s -V due to %s' % (self.binary, exception_name))
context.log.debug('additional info:', exc_info=True)
def add_default_logs(self):
"""
By default nginx uses logs placed in --prefix/logs/ directory
This method tries to find and add them
"""
access_log_path = '%s/logs/access.log' % self.prefix
if os.path.isfile(access_log_path) and access_log_path not in self.access_logs:
self.access_logs[access_log_path] = {'log_format': None}
error_log_path = '%s/logs/error.log' % self.prefix
if os.path.isfile(error_log_path) and error_log_path not in self.error_logs:
self.error_logs[error_log_path] = {'log_level': 'error'}
def run_ssl_analysis(self):
"""
Iterate over a list of ssl_certificate definitions and run ssl_analysis to construct a dictionary with
ssl_certificate value paired with results fo ssl_analysis.
:return: float run time
"""
if not self.parser_ssl_certificates:
return
start_time = time.time()
for cert_filename in set(self.parser_ssl_certificates):
ssl_analysis_result = ssl_analysis(cert_filename)
if ssl_analysis_result:
self.ssl_certificates[cert_filename] = ssl_analysis_result
end_time = time.time()
return end_time - start_time
def _exclude_logs(self):
"""
Iterate through log file stores and remove ones that match exclude rules.
"""
# Take comma-separated string of pathname patterns and separate them into individual patterns
exclude_rules = context.app_config.get('nginx', {}).get('exclude_logs', '').split(',')
for rule in [x for x in exclude_rules if x]: # skip potentially empty rules due to improper formatting
# access logs
for excluded_file in glib(self.access_logs.keys(), rule):
del self.access_logs[excluded_file]
# error logs
for excluded_file in glib(self.error_logs.keys(), rule):
del self.error_logs[excluded_file]
def _check_logs(self):
"""
Iterate through log file stores and add permissions and if it is readable to the log data
"""
for logs in (self.access_logs, self.error_logs):
for log_name in filter(lambda name: not name.startswith('syslog'), logs):
info = get_filesystem_info(log_name)
logs[log_name]['permissions'] = info['permissions']
try:
with open(log_name, 'r'):
pass
except:
logs[log_name]['readable'] = False
else:
logs[log_name]['readable'] = True