nginx-amplify-agent/amplify/agent/objects/nginx/config/parser.py

372 lines
14 KiB
Python

# -*- coding: utf-8 -*-
import fnmatch
import glob
import os
import re
import sys
import crossplane
try:
from os import scandir, walk
except ImportError:
from scandir import scandir, walk
from amplify.agent.common.context import context
__author__ = 'Arie van Luttikhuizen'
__copyright__ = 'Copyright (C) Nginx, Inc. All rights reserved.'
__license__ = ''
__maintainer__ = 'Arie van Luttikhuizen'
__email__ = 'arie@nginx.com'
# these regular expressions are used for light-weight parsing
INCLUDE_ONLY_RE = re.compile(r'(?:^|[;{}])\s*(include)\s+([\'"]?)([^#]*?)\2\s*?(?=;)')
INCLUDE_CERT_RE = re.compile(r'(?:^|[;{}])\s*(include|ssl_certificate)\s+([\'"]?)([^#]*?)\2\s*?(?=;)')
IGNORED_DIRECTIVES = [] if context.agent_name == 'controller' else frozenset([
'ssl_certificate_key',
'ssl_client_certificate',
'ssl_password_file',
'ssl_stapling_file',
'ssl_trusted_certificate',
'auth_basic_user_file',
'secure_link_secret'
])
def get_filesystem_info(path):
size, mtime, permissions = 0, 0, '0000'
try:
info = os.stat(path)
size = info.st_size
mtime = int(info.st_mtime)
permissions = oct(info.st_mode & 0o0777).zfill(4)
except Exception as e:
exc_cls = e.__class__.__name__
message = 'failed to stat %s do to %s' % (path, exc_cls)
context.log.debug(message, exc_info=True)
finally:
return {'size': size, 'mtime': mtime, 'permissions': permissions}
def _fnmatch_pattern(names, pttn):
if glob.has_magic(pttn):
return fnmatch.filter(names, pttn)
else:
return [pttn] if pttn in names else []
def _iglob_pattern(pattern):
if glob.has_magic(pattern):
for path in glob.iglob(pattern):
yield path
else:
yield pattern
def _getline(filename, lineno):
with open(filename, encoding='utf-8', errors='replace') as fp:
for i, line in enumerate(fp, start=1):
if i == lineno:
return line.rstrip('\r\n')
class NginxConfigParser(object):
"""
Parser responsible for parsing the NGINX config and following all includes.
It is created on demand and discarded after use (to save system resources).
"""
def __init__(self, filename='/etc/nginx/nginx.conf'):
self.filename = filename
self.directory = self._dirname(filename)
self.files = {}
self.directories = {}
self.directory_map = {}
self.errors = []
self._broken_files = {}
self._broken_directories = {}
self.tree = {}
self.includes = []
self.ssl_certificates = []
def _abspath(self, path):
if not os.path.isabs(path):
path = os.path.join(self.directory, path)
return os.path.normpath(path)
def _dirname(self, path):
return os.path.dirname(path) + '/'
def _handle_error(self, path, e, is_dir=False, exc_info=True, what='read'):
"""
Stores and logs errors raised by reading and parsing the nginx config
:param path: str - the absolute path of the file or directory
:param e: Exception - the exception that was raised
:param is_dir: bool - whether the path is for a directory
:param exc_info: True or (exc_type, exc_value, exc_traceback)
:param what: str - what action caused the error (used for logging)
"""
exc_cls = e.__class__.__name__
exc_msg = e.strerror if hasattr(e, 'strerror') else str(e)
message = 'failed to %s %s due to: %s' % (what, path, exc_cls)
self.errors.append(message)
if is_dir:
self._broken_directories[path] = '%s: %s' % (exc_cls, exc_msg)
context.log.debug(message, exc_info=exc_info)
else:
self._broken_files[path] = '%s: %s' % (exc_cls, exc_msg)
context.log.error(message)
if isinstance(e, crossplane.errors.NgxParserDirectiveError):
line = _getline(e.filename, e.lineno)
context.log.debug('line where error was raised: %r' % line)
context.log.debug('additional info:', exc_info=exc_info)
def _add_directory(self, dirname, check=False):
if dirname not in self.directories:
self.directories[dirname] = get_filesystem_info(dirname)
if check:
try:
scandir(dirname)
except Exception as e:
self._handle_error(dirname, e, is_dir=True)
def _add_file(self, filename):
if filename not in self.files:
dirname = self._dirname(filename)
self._add_directory(dirname, check=True)
try:
info = get_filesystem_info(filename)
info['lines'] = open(filename, encoding='utf-8', errors='replace').read().count('\n')
self.files[filename] = info
except Exception as e:
self._handle_error(filename, e, is_dir=False)
def _scan_path_pattern(self, pattern):
"""Similar to glob.iglob, except it saves directory errors"""
# just yield the file if it's a regular boring path with no magic
magic = glob.magic_check.search(pattern)
if magic is None:
yield pattern
return
# find the deepest path before the first magic part
elements = glob.magic_check.split(pattern, 1)
anchor = elements[0]
after = elements[-1]
anchor, start = anchor.rsplit('/', 1)
offset = anchor.count('/') + 1
anchor = anchor or '/'
# get all of the following path parts (>=1 will have magic)
after = start + magic.group(0) + after
parts = after.split('/')
# used to handle directory errors when walking filesystem
def onerror(e):
dirname = e.filename + '/'
if dirname not in self.directories:
self.directories[dirname] = get_filesystem_info(dirname)
self._handle_error(dirname, e, is_dir=True)
# walk the filesystem to collect file paths (and directory errors)
it = walk(anchor, followlinks=True, onerror=onerror)
for root, dirs, files in it:
# get the index of the current path part to use
index = (root != '/') + root.count('/') - offset
if index > len(parts) - 1:
# must've followed a recursive link so go no deeper
dirs[:] = []
elif index < len(parts) - 1:
# determine which directories to walk into next
dirs[:] = _fnmatch_pattern(dirs, parts[index])
else:
# this is the last part, so yield from matching files
for f in _fnmatch_pattern(files, parts[index]):
yield os.path.join(root, f)
# yield from matching directories too
for d in _fnmatch_pattern(dirs, parts[index]):
yield os.path.join(root, d) + '/'
def _collect_included_files_and_cert_dirs(self, block, include_ssl_certs):
for stmt in block:
if stmt['directive'] == 'include':
pattern = self._abspath(stmt['args'][0])
if pattern not in self.includes:
self.includes.append(pattern)
# use found include patterns to check for os errors
for filename in self._scan_path_pattern(pattern):
self._add_file(filename)
elif stmt['directive'] == 'ssl_certificate' and include_ssl_certs:
cert = self._abspath(stmt['args'][0])
if stmt['args'][0] and ('$' not in cert or ' if=$' in cert):
# add directories that only contain ssl cert files
if cert not in self.ssl_certificates:
self.ssl_certificates.append(cert)
dirname = self._dirname(cert)
self._add_directory(dirname, check=True)
elif 'block' in stmt:
self._collect_included_files_and_cert_dirs(stmt['block'], include_ssl_certs)
def parse(self, include_ssl_certs=True):
# clear results from the previous run
self.files = {}
self.directories = {}
# clear some bits and pieces from previous run
self._broken_files = {}
self._broken_directories = {}
self.includes = []
self.ssl_certificates = []
# use the new parser to parse the nginx config
self.tree = crossplane.parse(
filename=self.filename,
onerror=(lambda e: sys.exc_info()),
catch_errors=True,
ignore=IGNORED_DIRECTIVES
)
for error in self.tree['errors']:
path = error['file']
exc_info = error.pop('callback')
try:
# these error types are handled by this script already
if not isinstance(exc_info[1], (OSError, IOError)):
self._handle_error(path, exc_info[1], exc_info=exc_info, what='parse')
self._add_file(path)
finally:
# this speeds things up by deleting traceback, see python docs
del exc_info
# for every file in parsed payload, search for files/directories to add
for config in self.tree['config']:
if config['parsed']:
self._add_file(config['file'])
self._collect_included_files_and_cert_dirs(config['parsed'], include_ssl_certs=include_ssl_certs)
# construct directory_map
for dirname, info in self.directories.items():
self.directory_map[dirname] = {'info': info, 'files': {}}
for dirname, error in self._broken_directories.items():
self.directory_map.setdefault(dirname, {'info': {}, 'files': {}})
self.directory_map[dirname]['error'] = error
for filename, info in self.files.items():
dirname = self._dirname(filename)
self.directory_map[dirname]['files'][filename] = {'info': info}
for filename, error in self._broken_files.items():
dirname = self._dirname(filename)
self.directory_map[dirname]['files'].setdefault(filename, {'info': {}})
self.directory_map[dirname]['files'][filename]['error'] = error
def simplify(self):
"""
This will return one giant list that uses all of the includes logic
to compile one large nginx context (similar to parsing nginx -T).
It's very useful for post-analysis and testing.
"""
def simplify_block(block):
for stmt in block:
# ignore comments
if 'comment' in stmt:
continue
# recurse deeper into block contexts
if 'block' in stmt:
ctx = simplify_block(stmt['block'])
stmt = dict(stmt, block=list(ctx))
yield stmt
# do yield from contexts included from other files
if stmt['directive'] == 'include':
for index in stmt['includes']:
incl_block = self.tree['config'][index]['parsed']
for incl_stmt in simplify_block(incl_block):
yield incl_stmt
main_ctx = simplify_block(self.tree['config'][0]['parsed'])
return list(main_ctx)
def get_structure(self, include_ssl_certs=False):
"""
Collects included files, ssl cert files, and their directories and
then returns them as dicts with mtimes, sizes, and permissions
:param include_ssl_certs: bool - include ssl certs or not
:return: (dict, dict) - files, directories
"""
files = {}
if include_ssl_certs:
regex = INCLUDE_CERT_RE
has_directive = lambda line: 'include' in line or 'ssl_certificate' in line
else:
regex = INCLUDE_ONLY_RE
has_directive = lambda line: 'include' in line
def _skim_file(filename):
"""
Recursively skims nginx configs for include and ssl_certificate
directives, yielding paths of the files they reference on the way
"""
if filename in files:
return
yield filename
try:
# search each line for include or ssl_certificate directives
with open(filename, encoding='utf-8', errors='replace') as lines:
for line in lines:
if not has_directive(line):
continue
for match in regex.finditer(line):
if not match:
continue
file_pattern = self._abspath(match.group(3))
# add directory but don't use self._scan_path_pattern
# because we don't need to collect directory errors
dir_pattern = self._dirname(file_pattern)
for path in _iglob_pattern(dir_pattern):
self._add_directory(path, check=True)
# yield from matching files using _iglob_pattern
for path in _iglob_pattern(file_pattern):
if match.group(1) == 'include':
for p in _skim_file(path):
yield p
else:
yield path
except Exception as e:
self._handle_error(filename, e, is_dir=False)
# collect file names and get mtimes, sizes, and permissions for them
for fname in _skim_file(self.filename):
files[fname] = get_filesystem_info(fname)
return files, self.directories