diff --git a/bin/fail2ban-testcases b/bin/fail2ban-testcases index 7e5f7aca..4a93ac95 100755 --- a/bin/fail2ban-testcases +++ b/bin/fail2ban-testcases @@ -37,10 +37,8 @@ if os.path.exists("fail2ban/__init__.py"): sys.path.insert(0, ".") from fail2ban.version import version -from fail2ban.tests.utils import gatherTests -from fail2ban.helpers import FormatterWithTraceBack, getLogger +from fail2ban.tests.utils import initProcess, gatherTests from fail2ban.setup import updatePyExec -from fail2ban.server.mytime import MyTime from optparse import OptionParser, Option @@ -63,6 +61,10 @@ def get_opt_parser(): choices=('heavydebug', 'debug', 'info', 'notice', 'warning', 'error', 'critical'), default=None, help="Log level for the logger to use during running tests"), + Option("--log-direct", action="store_false", + dest="log_lazy", + default=True, + help="Prevent lazy logging inside tests"), Option('-n', "--no-network", action="store_true", dest="no_network", help="Do not run tests that require the network"), @@ -91,56 +93,15 @@ parser = get_opt_parser() (opts, regexps) = parser.parse_args() # -# Logging +# Process initialization corresponding options (logging, default options, etc.) # -logSys = getLogger("fail2ban") - -# Numerical level of verbosity corresponding to a log "level" -verbosity = {'heavydebug': 4, - 'debug': 3, - 'info': 2, - 'notice': 2, - 'warning': 1, - 'error': 1, - 'critical': 0, - None: 1}[opts.log_level] - -if opts.log_level is not None: # pragma: no cover - # so we had explicit settings - logSys.setLevel(getattr(logging, opts.log_level.upper())) -else: # pragma: no cover - # suppress the logging but it would leave unittests' progress dots - # ticking, unless like with '-l critical' which would be silent - # unless error occurs - logSys.setLevel(getattr(logging, 'CRITICAL')) - -# Add the default logging handler -stdout = logging.StreamHandler(sys.stdout) - -fmt = ' %(message)s' - -if opts.log_traceback: - Formatter = FormatterWithTraceBack - fmt = (opts.full_traceback and ' %(tb)s' or ' %(tbc)s') + fmt -else: - Formatter = logging.Formatter - -# Custom log format for the verbose tests runs -if verbosity > 1: # pragma: no cover - if verbosity > 3: - fmt = ' | %(module)15.15s-%(levelno)-2d: %(funcName)-20.20s |' + fmt - if verbosity > 2: - fmt = ' +%(relativeCreated)5d %(thread)X %(name)-25.25s %(levelname)-5.5s' + fmt - else: - fmt = ' %(asctime)-15s %(thread)X %(levelname)-5.5s' + fmt -# -stdout.setFormatter(Formatter(fmt)) -logSys.addHandler(stdout) +opts = initProcess(opts) +verbosity = opts.verbosity # # Let know the version # -if not opts.log_level or opts.log_level != 'critical': # pragma: no cover +if opts.log_level != logging.CRITICAL: # pragma: no cover print("Fail2ban %s test suite. Python %s. Please wait..." \ % (version, str(sys.version).replace('\n', ''))) diff --git a/fail2ban/setup.py b/fail2ban/setup.py index 87d50e66..efaa31a4 100644 --- a/fail2ban/setup.py +++ b/fail2ban/setup.py @@ -32,7 +32,7 @@ def updatePyExec(bindir, executable=None): executable = sys.executable pypath = os.path.join(bindir, 'fail2ban-python') # if not exists or point to another version - update link: - isfile = os.path.isfile(pypath) + isfile = os.path.isfile(os.path.realpath(pypath)) if not isfile or os.path.realpath(pypath) != os.path.realpath(executable): if isfile: os.unlink(pypath) diff --git a/fail2ban/tests/misctestcase.py b/fail2ban/tests/misctestcase.py index 740a4293..5b6b93a6 100644 --- a/fail2ban/tests/misctestcase.py +++ b/fail2ban/tests/misctestcase.py @@ -88,6 +88,7 @@ else: def _getSysPythonVersion(): return _sh_call("fail2ban-python -c 'import sys; print(tuple(sys.version_info))'") + class SetupTest(unittest.TestCase): def setUp(self): @@ -109,9 +110,11 @@ class SetupTest(unittest.TestCase): if not self.setup: return # if verbose skip didn't work out tmp = tempfile.mkdtemp() + # suppress stdout (and stderr) if not heavydebug + supdbgout = ' >/dev/null' if unittest.F2B.log_level >= logging.DEBUG else '' # HEAVYDEBUG try: - os.system("%s %s install --root=%s >/dev/null" - % (sys.executable, self.setup, tmp)) + os.system("%s %s install --disable-2to3 --dry-run --root=%s%s" + % (sys.executable, self.setup, tmp, supdbgout)) def strippath(l): return [x[len(tmp)+1:] for x in l] @@ -162,8 +165,8 @@ class SetupTest(unittest.TestCase): # clean up shutil.rmtree(tmp) # remove build directory - os.system("%s %s clean --all >/dev/null 2>&1" - % (sys.executable, self.setup)) + os.system("%s %s clean --all%s" + % (sys.executable, self.setup, (supdbgout + ' 2>&1') if supdbgout else '')) class TestsUtilsTest(LogCaptureTestCase): @@ -304,6 +307,17 @@ class TestsUtilsTest(LogCaptureTestCase): self.assertTrue(pindex > 10) # we should have some traceback self.assertEqual(s[:pindex], s[pindex+1:pindex*2 + 1]) + def testLazyLogging(self): + logSys = DefLogSys + if unittest.F2B.log_lazy: + # wrong logging syntax will throw an error lazy (on demand): + logSys.debug('test', 1, 2, 3) + self.assertRaisesRegexp(Exception, 'not all arguments converted', lambda: self.assertNotLogged('test')) + else: # pragma: no cover + # wrong logging syntax will throw an error directly: + self.assertRaisesRegexp(Exception, 'not all arguments converted', lambda: logSys.debug('test', 1, 2, 3)) + + iso8601 = DatePatternRegex("%Y-%m-%d[T ]%H:%M:%S(?:\.%f)?%z") diff --git a/fail2ban/tests/utils.py b/fail2ban/tests/utils.py index f6333fb8..232da780 100644 --- a/fail2ban/tests/utils.py +++ b/fail2ban/tests/utils.py @@ -31,9 +31,10 @@ import tempfile import shutil import sys import time +import threading import unittest -from StringIO import StringIO +from cStringIO import StringIO from functools import wraps from ..helpers import getLogger @@ -59,12 +60,74 @@ if not CONFIG_DIR: os.putenv('PYTHONPATH', os.path.dirname(os.path.dirname(os.path.dirname( os.path.abspath(__file__))))) -class F2B(optparse.Values): - def __init__(self, opts={}): - self.__dict__ = opts.__dict__ if opts else { - 'fast': False, 'memory_db':False, 'no_gamin': False, 'no_network': False, - "negate_re": False, +# Default options, if running from installer (setup.py): +class DefaultTestOptions(optparse.Values): + def __init__(self): + self.__dict__ = { + 'log_level': None, 'log_lazy': True, + 'log_traceback': None, 'full_traceback': None, + 'fast': False, 'memory_db': False, 'no_gamin': False, + 'no_network': False, 'negate_re': False } + +# +# Initialization +# +def initProcess(opts): + # Logger: + logSys = getLogger("fail2ban") + + # Numerical level of verbosity corresponding to a log "level" + verbosity = {'heavydebug': 4, + 'debug': 3, + 'info': 2, + 'notice': 2, + 'warning': 1, + 'error': 1, + 'critical': 0, + None: 1}[opts.log_level] + opts.verbosity = verbosity + + if opts.log_level is not None: # pragma: no cover + # so we had explicit settings + logSys.setLevel(getattr(logging, opts.log_level.upper())) + else: # pragma: no cover + # suppress the logging but it would leave unittests' progress dots + # ticking, unless like with '-l critical' which would be silent + # unless error occurs + logSys.setLevel(logging.CRITICAL) + opts.log_level = logSys.level + + # Add the default logging handler + stdout = logging.StreamHandler(sys.stdout) + + fmt = ' %(message)s' + + if opts.log_traceback: # pragma: no cover + from ..helpers import FormatterWithTraceBack as Formatter + fmt = (opts.full_traceback and ' %(tb)s' or ' %(tbc)s') + fmt + else: + Formatter = logging.Formatter + + # Custom log format for the verbose tests runs + if verbosity > 1: # pragma: no cover + if verbosity > 3: + fmt = ' | %(module)15.15s-%(levelno)-2d: %(funcName)-20.20s |' + fmt + if verbosity > 2: + fmt = ' +%(relativeCreated)5d %(thread)X %(name)-25.25s %(levelname)-5.5s' + fmt + else: + fmt = ' %(asctime)-15s %(thread)X %(levelname)-5.5s' + fmt + + # + stdout.setFormatter(Formatter(fmt)) + logSys.addHandler(stdout) + # + return opts; + + +class F2B(DefaultTestOptions): + def __init__(self, opts): + self.__dict__ = opts.__dict__ if self.fast: self.memory_db = True self.no_gamin = True @@ -96,7 +159,27 @@ def with_tmpdir(f): return wrapper +# backwards compatibility to python 2.6: +if not hasattr(unittest, 'SkipTest'): # pragma: no cover + class SkipTest(Exception): + pass + unittest.SkipTest = SkipTest + _org_AddError = unittest._TextTestResult.addError + def addError(self, test, err): + if err[0] is SkipTest: + if self.showAll: + self.stream.writeln(str(err[1])) + elif self.dots: + self.stream.write('s') + self.stream.flush() + return + _org_AddError(self, test, err) + unittest._TextTestResult.addError = addError + def initTests(opts): + ## if running from installer (setup.py): + if not opts: + opts = initProcess(DefaultTestOptions()) unittest.F2B = F2B(opts) # --fast : if unittest.F2B.fast: # pragma: no cover @@ -104,10 +187,9 @@ def initTests(opts): # (prevent long sleeping during test cases ... less time goes to sleep): Utils.DEFAULT_SLEEP_TIME = 0.0025 Utils.DEFAULT_SLEEP_INTERVAL = 0.0005 - if sys.version_info >= (2,7): # no skip in previous version: - def F2B_SkipIfFast(): - raise unittest.SkipTest('Skip test because of "--fast"') - unittest.F2B.SkipIfFast = F2B_SkipIfFast + def F2B_SkipIfFast(): + raise unittest.SkipTest('Skip test because of "--fast"') + unittest.F2B.SkipIfFast = F2B_SkipIfFast else: # sleep intervals are large - use replacement for sleep to check time to sleep: _org_sleep = time.sleep @@ -368,6 +450,66 @@ if True: ## if not hasattr(unittest.TestCase, 'assertIn'): class LogCaptureTestCase(unittest.TestCase): + class _MemHandler(logging.Handler): + """Logging handler helper + + Affords not to delegate logging to StreamHandler at all, + format lazily on demand in getvalue. + Increases performance inside the LogCaptureTestCase tests, because there + the log level set to DEBUG. + """ + + def __init__(self, lazy=True): + self._lock = threading.Lock() + self._val = None + self._recs = list() + self._strm = StringIO() + logging.Handler.__init__(self) + if lazy: + self.handle = self._handle_lazy + + def truncate(self, size=None): + """Truncate the internal buffer and records.""" + if size: + raise Exception('invalid size argument: %r, should be None or 0' % size) + with self._lock: + self._strm.truncate(0) + self._val = None + self._recs = list() + + def __write(self, record): + msg = record.getMessage() + '\n' + try: + self._strm.write(msg) + except UnicodeEncodeError: + self._strm.write(msg.encode('UTF-8')) + + def getvalue(self): + """Return current buffer as whole string.""" + with self._lock: + # cached: + if self._val is not None: + return self._val + # submit already emitted (delivered to handle) records: + for record in self._recs: + self.__write(record) + self._recs = list() + # cache and return: + self._val = self._strm.getvalue() + return self._val + + def handle(self, record): # pragma: no cover + """Handle the specified record direct (not lazy)""" + with self._lock: + self._val = None + self.__write(record) + + def _handle_lazy(self, record): + """Lazy handle the specified record on demand""" + with self._lock: + self._val = None + self._recs.append(record) + def setUp(self): # For extended testing of what gets output into logging @@ -378,13 +520,13 @@ class LogCaptureTestCase(unittest.TestCase): self._old_level = logSys.level self._old_handlers = logSys.handlers # Let's log everything into a string - self._log = StringIO() - logSys.handlers = [logging.StreamHandler(self._log)] + self._log = LogCaptureTestCase._MemHandler(unittest.F2B.log_lazy) + logSys.handlers = [self._log] if self._old_level <= logging.DEBUG: # so if DEBUG etc -- show them (and log it in travis)! print("") logSys.handlers += self._old_handlers logSys.debug('='*10 + ' %s ' + '='*20, self.id()) - logSys.setLevel(getattr(logging, 'DEBUG')) + logSys.setLevel(logging.DEBUG) def tearDown(self): """Call after every test case.""" diff --git a/setup.py b/setup.py index f9f57976..b3645f01 100755 --- a/setup.py +++ b/setup.py @@ -27,21 +27,26 @@ import platform try: import setuptools from setuptools import setup + from setuptools.command.install import install + from setuptools.command.install_scripts import install_scripts except ImportError: setuptools = None from distutils.core import setup +# all versions +from distutils.command.build_py import build_py +from distutils.command.build_scripts import build_scripts +if setuptools is None: + from distutils.command.install import install + from distutils.command.install_scripts import install_scripts try: # python 3.x - from distutils.command.build_py import build_py_2to3 as build_py - from distutils.command.build_scripts \ - import build_scripts_2to3 as build_scripts + from distutils.command.build_py import build_py_2to3 + from distutils.command.build_scripts import build_scripts_2to3 + _2to3 = True except ImportError: # python 2.x - from distutils.command.build_py import build_py - from distutils.command.build_scripts import build_scripts -# all versions -from distutils.command.install_scripts import install_scripts + _2to3 = False import os from os.path import isfile, join, isdir, realpath @@ -66,6 +71,27 @@ class install_scripts_f2b(install_scripts): updatePyExec(bindir) return outputs +# Wrapper to specify fail2ban own options: +class install_command_f2b(install): + user_options = install.user_options + [ + ('disable-2to3', None, 'Specify to deactivate 2to3, e.g. if the install runs from fail2ban test-cases.'), + ] + def initialize_options(self): + self.disable_2to3 = None + install.initialize_options(self) + def finalize_options(self): + global _2to3 + ## in the test cases 2to3 should be already done (fail2ban-2to3): + if self.disable_2to3: + _2to3 = False + if _2to3: + cmdclass = self.distribution.cmdclass + cmdclass['build_py'] = build_py_2to3 + cmdclass['build_scripts'] = build_scripts_2to3 + install.finalize_options(self) + def run(self): + install.run(self) + # Update fail2ban-python env to current python version (where f2b-modules located/installed) rootdir = os.path.realpath(os.path.dirname( @@ -143,7 +169,7 @@ setup( platforms = "Posix", cmdclass = { 'build_py': build_py, 'build_scripts': build_scripts, - 'install_scripts': install_scripts_f2b + 'install_scripts': install_scripts_f2b, 'install': install_command_f2b }, scripts = [ 'bin/fail2ban-client',