372 lines
14 KiB
Python
372 lines
14 KiB
Python
# -*- coding: utf-8 -*-
|
|
import fnmatch
|
|
import glob
|
|
import os
|
|
import re
|
|
import sys
|
|
|
|
import crossplane
|
|
|
|
try:
|
|
from os import scandir, walk
|
|
except ImportError:
|
|
from scandir import scandir, walk
|
|
|
|
from amplify.agent.common.context import context
|
|
|
|
__author__ = 'Arie van Luttikhuizen'
|
|
__copyright__ = 'Copyright (C) Nginx, Inc. All rights reserved.'
|
|
__license__ = ''
|
|
__maintainer__ = 'Arie van Luttikhuizen'
|
|
__email__ = 'arie@nginx.com'
|
|
|
|
# these regular expressions are used for light-weight parsing
|
|
INCLUDE_ONLY_RE = re.compile(r'(?:^|[;{}])\s*(include)\s+([\'"]?)([^#]*?)\2\s*?(?=;)')
|
|
INCLUDE_CERT_RE = re.compile(r'(?:^|[;{}])\s*(include|ssl_certificate)\s+([\'"]?)([^#]*?)\2\s*?(?=;)')
|
|
|
|
IGNORED_DIRECTIVES = [] if context.agent_name == 'controller' else frozenset([
|
|
'ssl_certificate_key',
|
|
'ssl_client_certificate',
|
|
'ssl_password_file',
|
|
'ssl_stapling_file',
|
|
'ssl_trusted_certificate',
|
|
'auth_basic_user_file',
|
|
'secure_link_secret'
|
|
])
|
|
|
|
|
|
def get_filesystem_info(path):
|
|
size, mtime, permissions = 0, 0, '0000'
|
|
try:
|
|
info = os.stat(path)
|
|
size = info.st_size
|
|
mtime = int(info.st_mtime)
|
|
permissions = oct(info.st_mode & 0o0777).zfill(4)
|
|
except Exception as e:
|
|
exc_cls = e.__class__.__name__
|
|
message = 'failed to stat %s do to %s' % (path, exc_cls)
|
|
context.log.debug(message, exc_info=True)
|
|
finally:
|
|
return {'size': size, 'mtime': mtime, 'permissions': permissions}
|
|
|
|
|
|
def _fnmatch_pattern(names, pttn):
|
|
if glob.has_magic(pttn):
|
|
return fnmatch.filter(names, pttn)
|
|
else:
|
|
return [pttn] if pttn in names else []
|
|
|
|
|
|
def _iglob_pattern(pattern):
|
|
if glob.has_magic(pattern):
|
|
for path in glob.iglob(pattern):
|
|
yield path
|
|
else:
|
|
yield pattern
|
|
|
|
|
|
def _getline(filename, lineno):
|
|
with open(filename, encoding='utf-8', errors='replace') as fp:
|
|
for i, line in enumerate(fp, start=1):
|
|
if i == lineno:
|
|
return line.rstrip('\r\n')
|
|
|
|
|
|
class NginxConfigParser(object):
|
|
"""
|
|
Parser responsible for parsing the NGINX config and following all includes.
|
|
It is created on demand and discarded after use (to save system resources).
|
|
"""
|
|
|
|
def __init__(self, filename='/etc/nginx/nginx.conf'):
|
|
self.filename = filename
|
|
self.directory = self._dirname(filename)
|
|
|
|
self.files = {}
|
|
self.directories = {}
|
|
self.directory_map = {}
|
|
|
|
self.errors = []
|
|
self._broken_files = {}
|
|
self._broken_directories = {}
|
|
|
|
self.tree = {}
|
|
|
|
self.includes = []
|
|
self.ssl_certificates = []
|
|
|
|
def _abspath(self, path):
|
|
if not os.path.isabs(path):
|
|
path = os.path.join(self.directory, path)
|
|
return os.path.normpath(path)
|
|
|
|
def _dirname(self, path):
|
|
return os.path.dirname(path) + '/'
|
|
|
|
def _handle_error(self, path, e, is_dir=False, exc_info=True, what='read'):
|
|
"""
|
|
Stores and logs errors raised by reading and parsing the nginx config
|
|
|
|
:param path: str - the absolute path of the file or directory
|
|
:param e: Exception - the exception that was raised
|
|
:param is_dir: bool - whether the path is for a directory
|
|
:param exc_info: True or (exc_type, exc_value, exc_traceback)
|
|
:param what: str - what action caused the error (used for logging)
|
|
"""
|
|
exc_cls = e.__class__.__name__
|
|
exc_msg = e.strerror if hasattr(e, 'strerror') else str(e)
|
|
message = 'failed to %s %s due to: %s' % (what, path, exc_cls)
|
|
self.errors.append(message)
|
|
if is_dir:
|
|
self._broken_directories[path] = '%s: %s' % (exc_cls, exc_msg)
|
|
context.log.debug(message, exc_info=exc_info)
|
|
else:
|
|
self._broken_files[path] = '%s: %s' % (exc_cls, exc_msg)
|
|
context.log.error(message)
|
|
|
|
if isinstance(e, crossplane.errors.NgxParserDirectiveError):
|
|
line = _getline(e.filename, e.lineno)
|
|
context.log.debug('line where error was raised: %r' % line)
|
|
|
|
context.log.debug('additional info:', exc_info=exc_info)
|
|
|
|
def _add_directory(self, dirname, check=False):
|
|
if dirname not in self.directories:
|
|
self.directories[dirname] = get_filesystem_info(dirname)
|
|
if check:
|
|
try:
|
|
scandir(dirname)
|
|
except Exception as e:
|
|
self._handle_error(dirname, e, is_dir=True)
|
|
|
|
def _add_file(self, filename):
|
|
if filename not in self.files:
|
|
dirname = self._dirname(filename)
|
|
self._add_directory(dirname, check=True)
|
|
try:
|
|
info = get_filesystem_info(filename)
|
|
info['lines'] = open(filename, encoding='utf-8', errors='replace').read().count('\n')
|
|
self.files[filename] = info
|
|
except Exception as e:
|
|
self._handle_error(filename, e, is_dir=False)
|
|
|
|
def _scan_path_pattern(self, pattern):
|
|
"""Similar to glob.iglob, except it saves directory errors"""
|
|
|
|
# just yield the file if it's a regular boring path with no magic
|
|
magic = glob.magic_check.search(pattern)
|
|
if magic is None:
|
|
yield pattern
|
|
return
|
|
|
|
# find the deepest path before the first magic part
|
|
elements = glob.magic_check.split(pattern, 1)
|
|
anchor = elements[0]
|
|
after = elements[-1]
|
|
|
|
anchor, start = anchor.rsplit('/', 1)
|
|
|
|
offset = anchor.count('/') + 1
|
|
anchor = anchor or '/'
|
|
|
|
# get all of the following path parts (>=1 will have magic)
|
|
after = start + magic.group(0) + after
|
|
parts = after.split('/')
|
|
|
|
# used to handle directory errors when walking filesystem
|
|
def onerror(e):
|
|
dirname = e.filename + '/'
|
|
if dirname not in self.directories:
|
|
self.directories[dirname] = get_filesystem_info(dirname)
|
|
self._handle_error(dirname, e, is_dir=True)
|
|
|
|
# walk the filesystem to collect file paths (and directory errors)
|
|
it = walk(anchor, followlinks=True, onerror=onerror)
|
|
for root, dirs, files in it:
|
|
# get the index of the current path part to use
|
|
index = (root != '/') + root.count('/') - offset
|
|
|
|
if index > len(parts) - 1:
|
|
# must've followed a recursive link so go no deeper
|
|
dirs[:] = []
|
|
elif index < len(parts) - 1:
|
|
# determine which directories to walk into next
|
|
dirs[:] = _fnmatch_pattern(dirs, parts[index])
|
|
else:
|
|
# this is the last part, so yield from matching files
|
|
for f in _fnmatch_pattern(files, parts[index]):
|
|
yield os.path.join(root, f)
|
|
|
|
# yield from matching directories too
|
|
for d in _fnmatch_pattern(dirs, parts[index]):
|
|
yield os.path.join(root, d) + '/'
|
|
|
|
def _collect_included_files_and_cert_dirs(self, block, include_ssl_certs):
|
|
for stmt in block:
|
|
if stmt['directive'] == 'include':
|
|
pattern = self._abspath(stmt['args'][0])
|
|
if pattern not in self.includes:
|
|
self.includes.append(pattern)
|
|
|
|
# use found include patterns to check for os errors
|
|
for filename in self._scan_path_pattern(pattern):
|
|
self._add_file(filename)
|
|
|
|
elif stmt['directive'] == 'ssl_certificate' and include_ssl_certs:
|
|
cert = self._abspath(stmt['args'][0])
|
|
if stmt['args'][0] and ('$' not in cert or ' if=$' in cert):
|
|
|
|
# add directories that only contain ssl cert files
|
|
if cert not in self.ssl_certificates:
|
|
self.ssl_certificates.append(cert)
|
|
dirname = self._dirname(cert)
|
|
self._add_directory(dirname, check=True)
|
|
|
|
elif 'block' in stmt:
|
|
self._collect_included_files_and_cert_dirs(stmt['block'], include_ssl_certs)
|
|
|
|
def parse(self, include_ssl_certs=True):
|
|
# clear results from the previous run
|
|
self.files = {}
|
|
self.directories = {}
|
|
|
|
# clear some bits and pieces from previous run
|
|
self._broken_files = {}
|
|
self._broken_directories = {}
|
|
self.includes = []
|
|
self.ssl_certificates = []
|
|
|
|
# use the new parser to parse the nginx config
|
|
self.tree = crossplane.parse(
|
|
filename=self.filename,
|
|
onerror=(lambda e: sys.exc_info()),
|
|
catch_errors=True,
|
|
ignore=IGNORED_DIRECTIVES
|
|
)
|
|
|
|
for error in self.tree['errors']:
|
|
path = error['file']
|
|
exc_info = error.pop('callback')
|
|
try:
|
|
# these error types are handled by this script already
|
|
if not isinstance(exc_info[1], (OSError, IOError)):
|
|
self._handle_error(path, exc_info[1], exc_info=exc_info, what='parse')
|
|
self._add_file(path)
|
|
finally:
|
|
# this speeds things up by deleting traceback, see python docs
|
|
del exc_info
|
|
|
|
# for every file in parsed payload, search for files/directories to add
|
|
for config in self.tree['config']:
|
|
if config['parsed']:
|
|
self._add_file(config['file'])
|
|
self._collect_included_files_and_cert_dirs(config['parsed'], include_ssl_certs=include_ssl_certs)
|
|
|
|
# construct directory_map
|
|
for dirname, info in self.directories.items():
|
|
self.directory_map[dirname] = {'info': info, 'files': {}}
|
|
|
|
for dirname, error in self._broken_directories.items():
|
|
self.directory_map.setdefault(dirname, {'info': {}, 'files': {}})
|
|
self.directory_map[dirname]['error'] = error
|
|
|
|
for filename, info in self.files.items():
|
|
dirname = self._dirname(filename)
|
|
self.directory_map[dirname]['files'][filename] = {'info': info}
|
|
|
|
for filename, error in self._broken_files.items():
|
|
dirname = self._dirname(filename)
|
|
self.directory_map[dirname]['files'].setdefault(filename, {'info': {}})
|
|
self.directory_map[dirname]['files'][filename]['error'] = error
|
|
|
|
def simplify(self):
|
|
"""
|
|
This will return one giant list that uses all of the includes logic
|
|
to compile one large nginx context (similar to parsing nginx -T).
|
|
It's very useful for post-analysis and testing.
|
|
"""
|
|
|
|
def simplify_block(block):
|
|
for stmt in block:
|
|
# ignore comments
|
|
if 'comment' in stmt:
|
|
continue
|
|
|
|
# recurse deeper into block contexts
|
|
if 'block' in stmt:
|
|
ctx = simplify_block(stmt['block'])
|
|
stmt = dict(stmt, block=list(ctx))
|
|
|
|
yield stmt
|
|
|
|
# do yield from contexts included from other files
|
|
if stmt['directive'] == 'include':
|
|
for index in stmt['includes']:
|
|
incl_block = self.tree['config'][index]['parsed']
|
|
for incl_stmt in simplify_block(incl_block):
|
|
yield incl_stmt
|
|
|
|
main_ctx = simplify_block(self.tree['config'][0]['parsed'])
|
|
return list(main_ctx)
|
|
|
|
def get_structure(self, include_ssl_certs=False):
|
|
"""
|
|
Collects included files, ssl cert files, and their directories and
|
|
then returns them as dicts with mtimes, sizes, and permissions
|
|
|
|
:param include_ssl_certs: bool - include ssl certs or not
|
|
:return: (dict, dict) - files, directories
|
|
"""
|
|
files = {}
|
|
|
|
if include_ssl_certs:
|
|
regex = INCLUDE_CERT_RE
|
|
has_directive = lambda line: 'include' in line or 'ssl_certificate' in line
|
|
else:
|
|
regex = INCLUDE_ONLY_RE
|
|
has_directive = lambda line: 'include' in line
|
|
|
|
def _skim_file(filename):
|
|
"""
|
|
Recursively skims nginx configs for include and ssl_certificate
|
|
directives, yielding paths of the files they reference on the way
|
|
"""
|
|
if filename in files:
|
|
return
|
|
|
|
yield filename
|
|
try:
|
|
# search each line for include or ssl_certificate directives
|
|
with open(filename, encoding='utf-8', errors='replace') as lines:
|
|
for line in lines:
|
|
if not has_directive(line):
|
|
continue
|
|
|
|
for match in regex.finditer(line):
|
|
if not match:
|
|
continue
|
|
|
|
file_pattern = self._abspath(match.group(3))
|
|
|
|
# add directory but don't use self._scan_path_pattern
|
|
# because we don't need to collect directory errors
|
|
dir_pattern = self._dirname(file_pattern)
|
|
for path in _iglob_pattern(dir_pattern):
|
|
self._add_directory(path, check=True)
|
|
|
|
# yield from matching files using _iglob_pattern
|
|
for path in _iglob_pattern(file_pattern):
|
|
if match.group(1) == 'include':
|
|
for p in _skim_file(path):
|
|
yield p
|
|
else:
|
|
yield path
|
|
except Exception as e:
|
|
self._handle_error(filename, e, is_dir=False)
|
|
|
|
# collect file names and get mtimes, sizes, and permissions for them
|
|
for fname in _skim_file(self.filename):
|
|
files[fname] = get_filesystem_info(fname)
|
|
|
|
return files, self.directories
|