From b2ee3b120e6629ff65dcc1c4523db918b514131e Mon Sep 17 00:00:00 2001 From: sebres Date: Fri, 12 Aug 2016 16:39:40 +0200 Subject: [PATCH] optimized logging using LogCaptureTestCase (new faster handler _MemHandler); backwards compatibility for python 2.6 (SkipTest implemented), so options --fast and --no-network may be used now within python2.6 also; --- fail2ban/tests/utils.py | 64 +++++++++++++++++++++++++++++++++++------ 1 file changed, 56 insertions(+), 8 deletions(-) diff --git a/fail2ban/tests/utils.py b/fail2ban/tests/utils.py index f6333fb8..3f0822dd 100644 --- a/fail2ban/tests/utils.py +++ b/fail2ban/tests/utils.py @@ -33,7 +33,7 @@ import sys import time import unittest -from StringIO import StringIO +from cStringIO import StringIO from functools import wraps from ..helpers import getLogger @@ -96,6 +96,23 @@ def with_tmpdir(f): return wrapper +# backwards compatibility to python 2.6: +if not hasattr(unittest, 'SkipTest'): # pragma: no cover + class SkipTest(Exception): + pass + unittest.SkipTest = SkipTest + _org_AddError = unittest._TextTestResult.addError + def addError(self, test, err): + if err[0] is SkipTest: + if self.showAll: + self.stream.writeln(str(err[1])) + elif self.dots: + self.stream.write('s') + self.stream.flush() + return + _org_AddError(self, test, err) + unittest._TextTestResult.addError = addError + def initTests(opts): unittest.F2B = F2B(opts) # --fast : @@ -104,10 +121,9 @@ def initTests(opts): # (prevent long sleeping during test cases ... less time goes to sleep): Utils.DEFAULT_SLEEP_TIME = 0.0025 Utils.DEFAULT_SLEEP_INTERVAL = 0.0005 - if sys.version_info >= (2,7): # no skip in previous version: - def F2B_SkipIfFast(): - raise unittest.SkipTest('Skip test because of "--fast"') - unittest.F2B.SkipIfFast = F2B_SkipIfFast + def F2B_SkipIfFast(): + raise unittest.SkipTest('Skip test because of "--fast"') + unittest.F2B.SkipIfFast = F2B_SkipIfFast else: # sleep intervals are large - use replacement for sleep to check time to sleep: _org_sleep = time.sleep @@ -368,6 +384,38 @@ if True: ## if not hasattr(unittest.TestCase, 'assertIn'): class LogCaptureTestCase(unittest.TestCase): + class _MemHandler(logging.StreamHandler): + def __init__(self): + self._recs = list() + self._strm = StringIO() + logging.StreamHandler.__init__(self, self._strm) + # + def truncate(self, size=None): + """Truncate the internal buffer and records.""" + if size: + raise Exception('invalid size argument: %r, should be None or 0' % size) + self._strm.truncate(0) + self._val = None + self._recs = list() + # + def getvalue(self): + """Return current buffer as whole string.""" + # cached: + if self._val is not None: + return self._val + # submit already emitted (delivered to handle) records: + for record in self._recs: + logging.StreamHandler.handle(self, record) + self._recs = list() + # cache and return: + self._val = self._strm.getvalue() + return self._val + # + def handle(self, record): + """Handle the specified record.""" + self._val = None + self._recs.append(record) + def setUp(self): # For extended testing of what gets output into logging @@ -378,13 +426,13 @@ class LogCaptureTestCase(unittest.TestCase): self._old_level = logSys.level self._old_handlers = logSys.handlers # Let's log everything into a string - self._log = StringIO() - logSys.handlers = [logging.StreamHandler(self._log)] + self._log = LogCaptureTestCase._MemHandler() + logSys.handlers = [self._log] if self._old_level <= logging.DEBUG: # so if DEBUG etc -- show them (and log it in travis)! print("") logSys.handlers += self._old_handlers logSys.debug('='*10 + ' %s ' + '='*20, self.id()) - logSys.setLevel(getattr(logging, 'DEBUG')) + logSys.setLevel(logging.DEBUG) def tearDown(self): """Call after every test case."""