mirror of https://github.com/fail2ban/fail2ban
optimized logging using LogCaptureTestCase (new faster handler _MemHandler);
backwards compatibility for python 2.6 (SkipTest implemented), so options --fast and --no-network may be used now within python2.6 also;pull/1510/head
parent
461e9bab56
commit
b2ee3b120e
|
@ -33,7 +33,7 @@ import sys
|
|||
import time
|
||||
import unittest
|
||||
|
||||
from StringIO import StringIO
|
||||
from cStringIO import StringIO
|
||||
from functools import wraps
|
||||
|
||||
from ..helpers import getLogger
|
||||
|
@ -96,6 +96,23 @@ def with_tmpdir(f):
|
|||
return wrapper
|
||||
|
||||
|
||||
# backwards compatibility to python 2.6:
|
||||
if not hasattr(unittest, 'SkipTest'): # pragma: no cover
|
||||
class SkipTest(Exception):
|
||||
pass
|
||||
unittest.SkipTest = SkipTest
|
||||
_org_AddError = unittest._TextTestResult.addError
|
||||
def addError(self, test, err):
|
||||
if err[0] is SkipTest:
|
||||
if self.showAll:
|
||||
self.stream.writeln(str(err[1]))
|
||||
elif self.dots:
|
||||
self.stream.write('s')
|
||||
self.stream.flush()
|
||||
return
|
||||
_org_AddError(self, test, err)
|
||||
unittest._TextTestResult.addError = addError
|
||||
|
||||
def initTests(opts):
|
||||
unittest.F2B = F2B(opts)
|
||||
# --fast :
|
||||
|
@ -104,10 +121,9 @@ def initTests(opts):
|
|||
# (prevent long sleeping during test cases ... less time goes to sleep):
|
||||
Utils.DEFAULT_SLEEP_TIME = 0.0025
|
||||
Utils.DEFAULT_SLEEP_INTERVAL = 0.0005
|
||||
if sys.version_info >= (2,7): # no skip in previous version:
|
||||
def F2B_SkipIfFast():
|
||||
raise unittest.SkipTest('Skip test because of "--fast"')
|
||||
unittest.F2B.SkipIfFast = F2B_SkipIfFast
|
||||
def F2B_SkipIfFast():
|
||||
raise unittest.SkipTest('Skip test because of "--fast"')
|
||||
unittest.F2B.SkipIfFast = F2B_SkipIfFast
|
||||
else:
|
||||
# sleep intervals are large - use replacement for sleep to check time to sleep:
|
||||
_org_sleep = time.sleep
|
||||
|
@ -368,6 +384,38 @@ if True: ## if not hasattr(unittest.TestCase, 'assertIn'):
|
|||
|
||||
class LogCaptureTestCase(unittest.TestCase):
|
||||
|
||||
class _MemHandler(logging.StreamHandler):
|
||||
def __init__(self):
|
||||
self._recs = list()
|
||||
self._strm = StringIO()
|
||||
logging.StreamHandler.__init__(self, self._strm)
|
||||
#
|
||||
def truncate(self, size=None):
|
||||
"""Truncate the internal buffer and records."""
|
||||
if size:
|
||||
raise Exception('invalid size argument: %r, should be None or 0' % size)
|
||||
self._strm.truncate(0)
|
||||
self._val = None
|
||||
self._recs = list()
|
||||
#
|
||||
def getvalue(self):
|
||||
"""Return current buffer as whole string."""
|
||||
# cached:
|
||||
if self._val is not None:
|
||||
return self._val
|
||||
# submit already emitted (delivered to handle) records:
|
||||
for record in self._recs:
|
||||
logging.StreamHandler.handle(self, record)
|
||||
self._recs = list()
|
||||
# cache and return:
|
||||
self._val = self._strm.getvalue()
|
||||
return self._val
|
||||
#
|
||||
def handle(self, record):
|
||||
"""Handle the specified record."""
|
||||
self._val = None
|
||||
self._recs.append(record)
|
||||
|
||||
def setUp(self):
|
||||
|
||||
# For extended testing of what gets output into logging
|
||||
|
@ -378,13 +426,13 @@ class LogCaptureTestCase(unittest.TestCase):
|
|||
self._old_level = logSys.level
|
||||
self._old_handlers = logSys.handlers
|
||||
# Let's log everything into a string
|
||||
self._log = StringIO()
|
||||
logSys.handlers = [logging.StreamHandler(self._log)]
|
||||
self._log = LogCaptureTestCase._MemHandler()
|
||||
logSys.handlers = [self._log]
|
||||
if self._old_level <= logging.DEBUG: # so if DEBUG etc -- show them (and log it in travis)!
|
||||
print("")
|
||||
logSys.handlers += self._old_handlers
|
||||
logSys.debug('='*10 + ' %s ' + '='*20, self.id())
|
||||
logSys.setLevel(getattr(logging, 'DEBUG'))
|
||||
logSys.setLevel(logging.DEBUG)
|
||||
|
||||
def tearDown(self):
|
||||
"""Call after every test case."""
|
||||
|
|
Loading…
Reference in New Issue