Merge pull request #1510 from sebres/_0.10/optimize-setup-test-case

0.10/optimize setup test case
pull/1516/head
Serg G. Brester 2016-08-16 10:16:42 +02:00 committed by GitHub
commit 9a30cfa77d
5 changed files with 217 additions and 74 deletions

View File

@ -37,10 +37,8 @@ if os.path.exists("fail2ban/__init__.py"):
sys.path.insert(0, ".")
from fail2ban.version import version
from fail2ban.tests.utils import gatherTests
from fail2ban.helpers import FormatterWithTraceBack, getLogger
from fail2ban.tests.utils import initProcess, gatherTests
from fail2ban.setup import updatePyExec
from fail2ban.server.mytime import MyTime
from optparse import OptionParser, Option
@ -63,6 +61,10 @@ def get_opt_parser():
choices=('heavydebug', 'debug', 'info', 'notice', 'warning', 'error', 'critical'),
default=None,
help="Log level for the logger to use during running tests"),
Option("--log-direct", action="store_false",
dest="log_lazy",
default=True,
help="Prevent lazy logging inside tests"),
Option('-n', "--no-network", action="store_true",
dest="no_network",
help="Do not run tests that require the network"),
@ -91,56 +93,15 @@ parser = get_opt_parser()
(opts, regexps) = parser.parse_args()
#
# Logging
# Process initialization corresponding options (logging, default options, etc.)
#
logSys = getLogger("fail2ban")
# Numerical level of verbosity corresponding to a log "level"
verbosity = {'heavydebug': 4,
'debug': 3,
'info': 2,
'notice': 2,
'warning': 1,
'error': 1,
'critical': 0,
None: 1}[opts.log_level]
if opts.log_level is not None: # pragma: no cover
# so we had explicit settings
logSys.setLevel(getattr(logging, opts.log_level.upper()))
else: # pragma: no cover
# suppress the logging but it would leave unittests' progress dots
# ticking, unless like with '-l critical' which would be silent
# unless error occurs
logSys.setLevel(getattr(logging, 'CRITICAL'))
# Add the default logging handler
stdout = logging.StreamHandler(sys.stdout)
fmt = ' %(message)s'
if opts.log_traceback:
Formatter = FormatterWithTraceBack
fmt = (opts.full_traceback and ' %(tb)s' or ' %(tbc)s') + fmt
else:
Formatter = logging.Formatter
# Custom log format for the verbose tests runs
if verbosity > 1: # pragma: no cover
if verbosity > 3:
fmt = ' | %(module)15.15s-%(levelno)-2d: %(funcName)-20.20s |' + fmt
if verbosity > 2:
fmt = ' +%(relativeCreated)5d %(thread)X %(name)-25.25s %(levelname)-5.5s' + fmt
else:
fmt = ' %(asctime)-15s %(thread)X %(levelname)-5.5s' + fmt
#
stdout.setFormatter(Formatter(fmt))
logSys.addHandler(stdout)
opts = initProcess(opts)
verbosity = opts.verbosity
#
# Let know the version
#
if not opts.log_level or opts.log_level != 'critical': # pragma: no cover
if opts.log_level != logging.CRITICAL: # pragma: no cover
print("Fail2ban %s test suite. Python %s. Please wait..." \
% (version, str(sys.version).replace('\n', '')))

View File

@ -32,7 +32,7 @@ def updatePyExec(bindir, executable=None):
executable = sys.executable
pypath = os.path.join(bindir, 'fail2ban-python')
# if not exists or point to another version - update link:
isfile = os.path.isfile(pypath)
isfile = os.path.isfile(os.path.realpath(pypath))
if not isfile or os.path.realpath(pypath) != os.path.realpath(executable):
if isfile:
os.unlink(pypath)

View File

@ -88,6 +88,7 @@ else:
def _getSysPythonVersion():
return _sh_call("fail2ban-python -c 'import sys; print(tuple(sys.version_info))'")
class SetupTest(unittest.TestCase):
def setUp(self):
@ -109,9 +110,11 @@ class SetupTest(unittest.TestCase):
if not self.setup:
return # if verbose skip didn't work out
tmp = tempfile.mkdtemp()
# suppress stdout (and stderr) if not heavydebug
supdbgout = ' >/dev/null' if unittest.F2B.log_level >= logging.DEBUG else '' # HEAVYDEBUG
try:
os.system("%s %s install --root=%s >/dev/null"
% (sys.executable, self.setup, tmp))
os.system("%s %s install --disable-2to3 --dry-run --root=%s%s"
% (sys.executable, self.setup, tmp, supdbgout))
def strippath(l):
return [x[len(tmp)+1:] for x in l]
@ -162,8 +165,8 @@ class SetupTest(unittest.TestCase):
# clean up
shutil.rmtree(tmp)
# remove build directory
os.system("%s %s clean --all >/dev/null 2>&1"
% (sys.executable, self.setup))
os.system("%s %s clean --all%s"
% (sys.executable, self.setup, (supdbgout + ' 2>&1') if supdbgout else ''))
class TestsUtilsTest(LogCaptureTestCase):
@ -304,6 +307,17 @@ class TestsUtilsTest(LogCaptureTestCase):
self.assertTrue(pindex > 10) # we should have some traceback
self.assertEqual(s[:pindex], s[pindex+1:pindex*2 + 1])
def testLazyLogging(self):
logSys = DefLogSys
if unittest.F2B.log_lazy:
# wrong logging syntax will throw an error lazy (on demand):
logSys.debug('test', 1, 2, 3)
self.assertRaisesRegexp(Exception, 'not all arguments converted', lambda: self.assertNotLogged('test'))
else: # pragma: no cover
# wrong logging syntax will throw an error directly:
self.assertRaisesRegexp(Exception, 'not all arguments converted', lambda: logSys.debug('test', 1, 2, 3))
iso8601 = DatePatternRegex("%Y-%m-%d[T ]%H:%M:%S(?:\.%f)?%z")

View File

@ -31,9 +31,10 @@ import tempfile
import shutil
import sys
import time
import threading
import unittest
from StringIO import StringIO
from cStringIO import StringIO
from functools import wraps
from ..helpers import getLogger
@ -59,12 +60,74 @@ if not CONFIG_DIR:
os.putenv('PYTHONPATH', os.path.dirname(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)))))
class F2B(optparse.Values):
def __init__(self, opts={}):
self.__dict__ = opts.__dict__ if opts else {
'fast': False, 'memory_db':False, 'no_gamin': False, 'no_network': False,
"negate_re": False,
# Default options, if running from installer (setup.py):
class DefaultTestOptions(optparse.Values):
def __init__(self):
self.__dict__ = {
'log_level': None, 'log_lazy': True,
'log_traceback': None, 'full_traceback': None,
'fast': False, 'memory_db': False, 'no_gamin': False,
'no_network': False, 'negate_re': False
}
#
# Initialization
#
def initProcess(opts):
# Logger:
logSys = getLogger("fail2ban")
# Numerical level of verbosity corresponding to a log "level"
verbosity = {'heavydebug': 4,
'debug': 3,
'info': 2,
'notice': 2,
'warning': 1,
'error': 1,
'critical': 0,
None: 1}[opts.log_level]
opts.verbosity = verbosity
if opts.log_level is not None: # pragma: no cover
# so we had explicit settings
logSys.setLevel(getattr(logging, opts.log_level.upper()))
else: # pragma: no cover
# suppress the logging but it would leave unittests' progress dots
# ticking, unless like with '-l critical' which would be silent
# unless error occurs
logSys.setLevel(logging.CRITICAL)
opts.log_level = logSys.level
# Add the default logging handler
stdout = logging.StreamHandler(sys.stdout)
fmt = ' %(message)s'
if opts.log_traceback: # pragma: no cover
from ..helpers import FormatterWithTraceBack as Formatter
fmt = (opts.full_traceback and ' %(tb)s' or ' %(tbc)s') + fmt
else:
Formatter = logging.Formatter
# Custom log format for the verbose tests runs
if verbosity > 1: # pragma: no cover
if verbosity > 3:
fmt = ' | %(module)15.15s-%(levelno)-2d: %(funcName)-20.20s |' + fmt
if verbosity > 2:
fmt = ' +%(relativeCreated)5d %(thread)X %(name)-25.25s %(levelname)-5.5s' + fmt
else:
fmt = ' %(asctime)-15s %(thread)X %(levelname)-5.5s' + fmt
#
stdout.setFormatter(Formatter(fmt))
logSys.addHandler(stdout)
#
return opts;
class F2B(DefaultTestOptions):
def __init__(self, opts):
self.__dict__ = opts.__dict__
if self.fast:
self.memory_db = True
self.no_gamin = True
@ -96,7 +159,27 @@ def with_tmpdir(f):
return wrapper
# backwards compatibility to python 2.6:
if not hasattr(unittest, 'SkipTest'): # pragma: no cover
class SkipTest(Exception):
pass
unittest.SkipTest = SkipTest
_org_AddError = unittest._TextTestResult.addError
def addError(self, test, err):
if err[0] is SkipTest:
if self.showAll:
self.stream.writeln(str(err[1]))
elif self.dots:
self.stream.write('s')
self.stream.flush()
return
_org_AddError(self, test, err)
unittest._TextTestResult.addError = addError
def initTests(opts):
## if running from installer (setup.py):
if not opts:
opts = initProcess(DefaultTestOptions())
unittest.F2B = F2B(opts)
# --fast :
if unittest.F2B.fast: # pragma: no cover
@ -104,10 +187,9 @@ def initTests(opts):
# (prevent long sleeping during test cases ... less time goes to sleep):
Utils.DEFAULT_SLEEP_TIME = 0.0025
Utils.DEFAULT_SLEEP_INTERVAL = 0.0005
if sys.version_info >= (2,7): # no skip in previous version:
def F2B_SkipIfFast():
raise unittest.SkipTest('Skip test because of "--fast"')
unittest.F2B.SkipIfFast = F2B_SkipIfFast
def F2B_SkipIfFast():
raise unittest.SkipTest('Skip test because of "--fast"')
unittest.F2B.SkipIfFast = F2B_SkipIfFast
else:
# sleep intervals are large - use replacement for sleep to check time to sleep:
_org_sleep = time.sleep
@ -368,6 +450,66 @@ if True: ## if not hasattr(unittest.TestCase, 'assertIn'):
class LogCaptureTestCase(unittest.TestCase):
class _MemHandler(logging.Handler):
"""Logging handler helper
Affords not to delegate logging to StreamHandler at all,
format lazily on demand in getvalue.
Increases performance inside the LogCaptureTestCase tests, because there
the log level set to DEBUG.
"""
def __init__(self, lazy=True):
self._lock = threading.Lock()
self._val = None
self._recs = list()
self._strm = StringIO()
logging.Handler.__init__(self)
if lazy:
self.handle = self._handle_lazy
def truncate(self, size=None):
"""Truncate the internal buffer and records."""
if size:
raise Exception('invalid size argument: %r, should be None or 0' % size)
with self._lock:
self._strm.truncate(0)
self._val = None
self._recs = list()
def __write(self, record):
msg = record.getMessage() + '\n'
try:
self._strm.write(msg)
except UnicodeEncodeError:
self._strm.write(msg.encode('UTF-8'))
def getvalue(self):
"""Return current buffer as whole string."""
with self._lock:
# cached:
if self._val is not None:
return self._val
# submit already emitted (delivered to handle) records:
for record in self._recs:
self.__write(record)
self._recs = list()
# cache and return:
self._val = self._strm.getvalue()
return self._val
def handle(self, record): # pragma: no cover
"""Handle the specified record direct (not lazy)"""
with self._lock:
self._val = None
self.__write(record)
def _handle_lazy(self, record):
"""Lazy handle the specified record on demand"""
with self._lock:
self._val = None
self._recs.append(record)
def setUp(self):
# For extended testing of what gets output into logging
@ -378,13 +520,13 @@ class LogCaptureTestCase(unittest.TestCase):
self._old_level = logSys.level
self._old_handlers = logSys.handlers
# Let's log everything into a string
self._log = StringIO()
logSys.handlers = [logging.StreamHandler(self._log)]
self._log = LogCaptureTestCase._MemHandler(unittest.F2B.log_lazy)
logSys.handlers = [self._log]
if self._old_level <= logging.DEBUG: # so if DEBUG etc -- show them (and log it in travis)!
print("")
logSys.handlers += self._old_handlers
logSys.debug('='*10 + ' %s ' + '='*20, self.id())
logSys.setLevel(getattr(logging, 'DEBUG'))
logSys.setLevel(logging.DEBUG)
def tearDown(self):
"""Call after every test case."""

View File

@ -27,21 +27,26 @@ import platform
try:
import setuptools
from setuptools import setup
from setuptools.command.install import install
from setuptools.command.install_scripts import install_scripts
except ImportError:
setuptools = None
from distutils.core import setup
# all versions
from distutils.command.build_py import build_py
from distutils.command.build_scripts import build_scripts
if setuptools is None:
from distutils.command.install import install
from distutils.command.install_scripts import install_scripts
try:
# python 3.x
from distutils.command.build_py import build_py_2to3 as build_py
from distutils.command.build_scripts \
import build_scripts_2to3 as build_scripts
from distutils.command.build_py import build_py_2to3
from distutils.command.build_scripts import build_scripts_2to3
_2to3 = True
except ImportError:
# python 2.x
from distutils.command.build_py import build_py
from distutils.command.build_scripts import build_scripts
# all versions
from distutils.command.install_scripts import install_scripts
_2to3 = False
import os
from os.path import isfile, join, isdir, realpath
@ -66,6 +71,27 @@ class install_scripts_f2b(install_scripts):
updatePyExec(bindir)
return outputs
# Wrapper to specify fail2ban own options:
class install_command_f2b(install):
user_options = install.user_options + [
('disable-2to3', None, 'Specify to deactivate 2to3, e.g. if the install runs from fail2ban test-cases.'),
]
def initialize_options(self):
self.disable_2to3 = None
install.initialize_options(self)
def finalize_options(self):
global _2to3
## in the test cases 2to3 should be already done (fail2ban-2to3):
if self.disable_2to3:
_2to3 = False
if _2to3:
cmdclass = self.distribution.cmdclass
cmdclass['build_py'] = build_py_2to3
cmdclass['build_scripts'] = build_scripts_2to3
install.finalize_options(self)
def run(self):
install.run(self)
# Update fail2ban-python env to current python version (where f2b-modules located/installed)
rootdir = os.path.realpath(os.path.dirname(
@ -143,7 +169,7 @@ setup(
platforms = "Posix",
cmdclass = {
'build_py': build_py, 'build_scripts': build_scripts,
'install_scripts': install_scripts_f2b
'install_scripts': install_scripts_f2b, 'install': install_command_f2b
},
scripts = [
'bin/fail2ban-client',