mirror of https://github.com/fail2ban/fail2ban
stability of time-related test-cases: a bit increased timeouts; code normalization, review and coverage
parent
92f19d0604
commit
71b19d9eba
|
@ -55,8 +55,8 @@ CLIENT = "fail2ban-client"
|
|||
SERVER = "fail2ban-server"
|
||||
BIN = dirname(Fail2banServer.getServerPath())
|
||||
|
||||
MAX_WAITTIME = 30 if not unittest.F2B.fast else 5
|
||||
MID_WAITTIME = MAX_WAITTIME
|
||||
MAX_WAITTIME = unittest.F2B.maxWaitTime(unittest.F2B.MAX_WAITTIME)
|
||||
MID_WAITTIME = unittest.F2B.maxWaitTime(unittest.F2B.MID_WAITTIME)
|
||||
|
||||
##
|
||||
# Several wrappers and settings for proper testing:
|
||||
|
|
|
@ -82,10 +82,7 @@ def _killfile(f, name):
|
|||
_killfile(None, name + '.bak')
|
||||
|
||||
|
||||
def _maxWaitTime(wtime):
|
||||
if unittest.F2B.fast: # pragma: no cover
|
||||
wtime /= 10.0
|
||||
return wtime
|
||||
_maxWaitTime = unittest.F2B.maxWaitTime
|
||||
|
||||
|
||||
class _tmSerial():
|
||||
|
@ -657,12 +654,12 @@ class LogFileMonitor(LogCaptureTestCase):
|
|||
_killfile(self.file, self.name)
|
||||
pass
|
||||
|
||||
def isModified(self, delay=2.):
|
||||
def isModified(self, delay=2):
|
||||
"""Wait up to `delay` sec to assure that it was modified or not
|
||||
"""
|
||||
return Utils.wait_for(lambda: self.filter.isModified(self.name), _maxWaitTime(delay))
|
||||
|
||||
def notModified(self, delay=2.):
|
||||
def notModified(self, delay=2):
|
||||
"""Wait up to `delay` sec as long as it was not modified
|
||||
"""
|
||||
return Utils.wait_for(lambda: not self.filter.isModified(self.name), _maxWaitTime(delay))
|
||||
|
@ -817,7 +814,7 @@ class CommonMonitorTestCase(unittest.TestCase):
|
|||
super(CommonMonitorTestCase, self).setUp()
|
||||
self._failTotal = 0
|
||||
|
||||
def waitFailTotal(self, count, delay=1.):
|
||||
def waitFailTotal(self, count, delay=1):
|
||||
"""Wait up to `delay` sec to assure that expected failure `count` reached
|
||||
"""
|
||||
ret = Utils.wait_for(
|
||||
|
@ -826,7 +823,7 @@ class CommonMonitorTestCase(unittest.TestCase):
|
|||
self._failTotal += count
|
||||
return ret
|
||||
|
||||
def isFilled(self, delay=1.):
|
||||
def isFilled(self, delay=1):
|
||||
"""Wait up to `delay` sec to assure that it was modified or not
|
||||
"""
|
||||
return Utils.wait_for(self.jail.isFilled, _maxWaitTime(delay))
|
||||
|
@ -836,7 +833,7 @@ class CommonMonitorTestCase(unittest.TestCase):
|
|||
"""
|
||||
return Utils.wait_for(self.jail.isEmpty, _maxWaitTime(delay))
|
||||
|
||||
def waitForTicks(self, ticks, delay=2.):
|
||||
def waitForTicks(self, ticks, delay=2):
|
||||
"""Wait up to `delay` sec to assure that it was modified or not
|
||||
"""
|
||||
last_ticks = self.filter.ticks
|
||||
|
|
|
@ -287,6 +287,20 @@ class TestsUtilsTest(LogCaptureTestCase):
|
|||
self.assertNotLogged('test "xyz"')
|
||||
self.assertNotLogged('test', 'xyz', all=False)
|
||||
self.assertNotLogged('test', 'xyz', 'zyx', all=True)
|
||||
## maxWaitTime:
|
||||
orgfast, unittest.F2B.fast = unittest.F2B.fast, False
|
||||
self.assertFalse(isinstance(unittest.F2B.maxWaitTime(True), bool))
|
||||
self.assertEqual(unittest.F2B.maxWaitTime(lambda: 50)(), 50)
|
||||
self.assertEqual(unittest.F2B.maxWaitTime(25), 25)
|
||||
self.assertEqual(unittest.F2B.maxWaitTime(25.), 25.0)
|
||||
unittest.F2B.fast = True
|
||||
try:
|
||||
self.assertEqual(unittest.F2B.maxWaitTime(lambda: 50)(), 50)
|
||||
self.assertEqual(unittest.F2B.maxWaitTime(25), 2.5)
|
||||
self.assertEqual(unittest.F2B.maxWaitTime(25.), 25.0)
|
||||
finally:
|
||||
unittest.F2B.fast = orgfast
|
||||
self.assertFalse(unittest.F2B.maxWaitTime(False))
|
||||
## assertLogged, assertNotLogged negative case:
|
||||
self.pruneLog()
|
||||
logSys.debug('test "xyz"')
|
||||
|
@ -296,8 +310,12 @@ class TestsUtilsTest(LogCaptureTestCase):
|
|||
self.assertNotLogged, 'test', 'xyz', all=True)
|
||||
self._testAssertionErrorRE(r"was not found in the log",
|
||||
self.assertLogged, 'test', 'zyx', all=True)
|
||||
self._testAssertionErrorRE(r"was not found in the log, waited 1e-06",
|
||||
self.assertLogged, 'test', 'zyx', all=True, wait=1e-6)
|
||||
self._testAssertionErrorRE(r"None among .* was found in the log",
|
||||
self.assertLogged, 'test_zyx', 'zyx', all=False)
|
||||
self._testAssertionErrorRE(r"None among .* was found in the log, waited 1e-06",
|
||||
self.assertLogged, 'test_zyx', 'zyx', all=False, wait=1e-6)
|
||||
self._testAssertionErrorRE(r"All of the .* were found present in the log",
|
||||
self.assertNotLogged, 'test', 'xyz', all=False)
|
||||
## assertDictEqual:
|
||||
|
|
|
@ -180,6 +180,10 @@ def initProcess(opts):
|
|||
|
||||
|
||||
class F2B(DefaultTestOptions):
|
||||
|
||||
MAX_WAITTIME = 60
|
||||
MID_WAITTIME = 30
|
||||
|
||||
def __init__(self, opts):
|
||||
self.__dict__ = opts.__dict__
|
||||
if self.fast:
|
||||
|
@ -215,8 +219,12 @@ class F2B(DefaultTestOptions):
|
|||
return wrapper
|
||||
return _deco_wrapper
|
||||
|
||||
def maxWaitTime(self,wtime):
|
||||
if self.fast:
|
||||
def maxWaitTime(self, wtime=True):
|
||||
if isinstance(wtime, bool) and wtime:
|
||||
wtime = self.MAX_WAITTIME
|
||||
# short only integer interval (avoid by conditional wait with callable, and dual
|
||||
# wrapping in some routines, if it will be called twice):
|
||||
if self.fast and isinstance(wtime, int):
|
||||
wtime = float(wtime) / 10
|
||||
return wtime
|
||||
|
||||
|
@ -761,21 +769,24 @@ class LogCaptureTestCase(unittest.TestCase):
|
|||
"""
|
||||
wait = kwargs.get('wait', None)
|
||||
if wait:
|
||||
wait = unittest.F2B.maxWaitTime(wait)
|
||||
res = Utils.wait_for(lambda: self._is_logged(*s, **kwargs), wait)
|
||||
else:
|
||||
res = self._is_logged(*s, **kwargs)
|
||||
if not kwargs.get('all', False):
|
||||
# at least one entry should be found:
|
||||
if not res: # pragma: no cover
|
||||
if not res:
|
||||
logged = self._log.getvalue()
|
||||
self.fail("None among %r was found in the log: ===\n%s===" % (s, logged))
|
||||
self.fail("None among %r was found in the log%s: ===\n%s===" % (s,
|
||||
((', waited %s' % wait) if wait else ''), logged))
|
||||
else:
|
||||
# each entry should be found:
|
||||
if not res: # pragma: no cover
|
||||
if not res:
|
||||
logged = self._log.getvalue()
|
||||
for s_ in s:
|
||||
if s_ not in logged:
|
||||
self.fail("%r was not found in the log: ===\n%s===" % (s_, logged))
|
||||
self.fail("%r was not found in the log%s: ===\n%s===" % (s_,
|
||||
((', waited %s' % wait) if wait else ''), logged))
|
||||
|
||||
def assertNotLogged(self, *s, **kwargs):
|
||||
"""Assert that strings were not logged
|
||||
|
@ -792,11 +803,10 @@ class LogCaptureTestCase(unittest.TestCase):
|
|||
for s_ in s:
|
||||
if s_ not in logged:
|
||||
return
|
||||
if True: # pragma: no cover
|
||||
self.fail("All of the %r were found present in the log: ===\n%s===" % (s, logged))
|
||||
self.fail("All of the %r were found present in the log: ===\n%s===" % (s, logged))
|
||||
else:
|
||||
for s_ in s:
|
||||
if s_ in logged: # pragma: no cover
|
||||
if s_ in logged:
|
||||
self.fail("%r was found in the log: ===\n%s===" % (s_, logged))
|
||||
|
||||
def pruneLog(self, logphase=None):
|
||||
|
|
Loading…
Reference in New Issue