mirror of https://github.com/fail2ban/fail2ban
assertLogged extended with parameter wait (to wait up to specified timeout, before we throw assert exception) + test cases rewritten using that
parent
a20f325f80
commit
e0347bb3a0
|
@ -737,11 +737,9 @@ class Fail2banServerTest(Fail2banClientServerBase):
|
|||
if DefLogSys.level < logging.DEBUG: # if HEAVYDEBUG
|
||||
_out_file(test1log)
|
||||
self.execSuccess(startparams, "reload")
|
||||
self.assertTrue(
|
||||
Utils.wait_for(lambda: \
|
||||
self._is_logged("Reload finished.") and
|
||||
self._is_logged("1 ticket(s) in 'test-jail1")
|
||||
, MID_WAITTIME))
|
||||
self.assertLogged(
|
||||
"Reload finished.",
|
||||
"1 ticket(s) in 'test-jail1", all=True, wait=MID_WAITTIME)
|
||||
self.assertLogged("Added logfile: %r" % test1log)
|
||||
self.assertLogged("[test-jail1] Ban 192.0.2.1")
|
||||
# test actions started:
|
||||
|
@ -757,8 +755,7 @@ class Fail2banServerTest(Fail2banClientServerBase):
|
|||
if DefLogSys.level < logging.DEBUG: # if HEAVYDEBUG
|
||||
_out_file(test1log)
|
||||
self.execSuccess(startparams, "reload")
|
||||
self.assertTrue(
|
||||
Utils.wait_for(lambda: self._is_logged("Reload finished."), MID_WAITTIME))
|
||||
self.assertLogged("Reload finished.", all=True, wait=MID_WAITTIME)
|
||||
# test not unbanned / banned again:
|
||||
self.assertNotLogged(
|
||||
"[test-jail1] Unban 192.0.2.1",
|
||||
|
@ -784,8 +781,7 @@ class Fail2banServerTest(Fail2banClientServerBase):
|
|||
reload=" echo '[<name>] %s: reloaded.'" % "test-action1",
|
||||
stop= " echo '[<name>] %s: stopped.'" % "test-action1")
|
||||
self.execSuccess(startparams, "reload")
|
||||
self.assertTrue(
|
||||
Utils.wait_for(lambda: self._is_logged("Reload finished."), MID_WAITTIME))
|
||||
self.assertLogged("Reload finished.", all=True, wait=MID_WAITTIME)
|
||||
# test not unbanned / banned again:
|
||||
self.assertNotLogged(
|
||||
"[test-jail1] Unban 192.0.2.1",
|
||||
|
@ -821,11 +817,9 @@ class Fail2banServerTest(Fail2banClientServerBase):
|
|||
if DefLogSys.level < logging.DEBUG: # if HEAVYDEBUG
|
||||
_out_file(test2log)
|
||||
# test all will be found in jail1 and one in jail2:
|
||||
self.assertTrue(
|
||||
Utils.wait_for(lambda: \
|
||||
self._is_logged("2 ticket(s) in 'test-jail2") and
|
||||
self._is_logged("5 ticket(s) in 'test-jail1")
|
||||
, MID_WAITTIME))
|
||||
self.assertLogged(
|
||||
"2 ticket(s) in 'test-jail2",
|
||||
"5 ticket(s) in 'test-jail1", all=True, wait=MID_WAITTIME)
|
||||
self.assertLogged(
|
||||
"[test-jail1] Ban 192.0.2.2",
|
||||
"[test-jail1] Ban 192.0.2.3",
|
||||
|
@ -848,12 +842,10 @@ class Fail2banServerTest(Fail2banClientServerBase):
|
|||
self.pruneLog("[test-phase 2c]")
|
||||
self.execSuccess(startparams,
|
||||
"restart", "test-jail2")
|
||||
self.assertTrue(
|
||||
Utils.wait_for(lambda: \
|
||||
self._is_logged("Reload finished.") and
|
||||
self._is_logged("Restore Ban") and
|
||||
self._is_logged("2 ticket(s) in 'test-jail2")
|
||||
, MID_WAITTIME))
|
||||
self.assertLogged(
|
||||
"Reload finished.",
|
||||
"Restore Ban",
|
||||
"2 ticket(s) in 'test-jail2", all=True, wait=MID_WAITTIME)
|
||||
# stop/start and unban/restore ban:
|
||||
self.assertLogged(
|
||||
"Jail 'test-jail2' stopped",
|
||||
|
@ -868,11 +860,9 @@ class Fail2banServerTest(Fail2banClientServerBase):
|
|||
self.pruneLog("[test-phase 2d]")
|
||||
self.execSuccess(startparams,
|
||||
"restart", "--unban", "test-jail2")
|
||||
self.assertTrue(
|
||||
Utils.wait_for(lambda: \
|
||||
self._is_logged("Reload finished.") and
|
||||
self._is_logged("Jail 'test-jail2' started")
|
||||
, MID_WAITTIME))
|
||||
self.assertLogged(
|
||||
"Reload finished.",
|
||||
"Jail 'test-jail2' started", all=True, wait=MID_WAITTIME)
|
||||
self.assertLogged(
|
||||
"Jail 'test-jail2' stopped",
|
||||
"Jail 'test-jail2' started",
|
||||
|
@ -888,8 +878,8 @@ class Fail2banServerTest(Fail2banClientServerBase):
|
|||
# reload jail1 without restart (without ban/unban):
|
||||
self.pruneLog("[test-phase 3]")
|
||||
self.execSuccess(startparams, "reload", "test-jail1")
|
||||
self.assertTrue(
|
||||
Utils.wait_for(lambda: self._is_logged("Reload finished."), MID_WAITTIME))
|
||||
self.assertLogged(
|
||||
"Reload finished.", all=True, wait=MID_WAITTIME)
|
||||
self.assertLogged(
|
||||
"Reload jail 'test-jail1'",
|
||||
"Jail 'test-jail1' reloaded", all=True)
|
||||
|
@ -903,8 +893,7 @@ class Fail2banServerTest(Fail2banClientServerBase):
|
|||
self.pruneLog("[test-phase 4]")
|
||||
_write_jail_cfg(enabled=[1])
|
||||
self.execSuccess(startparams, "reload")
|
||||
self.assertTrue(
|
||||
Utils.wait_for(lambda: self._is_logged("Reload finished."), MID_WAITTIME))
|
||||
self.assertLogged("Reload finished.", all=True, wait=MID_WAITTIME)
|
||||
# test both jails should be reloaded:
|
||||
self.assertLogged(
|
||||
"Reload jail 'test-jail1'")
|
||||
|
@ -926,11 +915,9 @@ class Fail2banServerTest(Fail2banClientServerBase):
|
|||
))
|
||||
if DefLogSys.level < logging.DEBUG: # if HEAVYDEBUG
|
||||
_out_file(test1log)
|
||||
self.assertTrue(
|
||||
Utils.wait_for(lambda: \
|
||||
self._is_logged("6 ticket(s) in 'test-jail1") and
|
||||
self._is_logged("[test-jail1] 192.0.2.1 already banned")
|
||||
, MID_WAITTIME))
|
||||
self.assertLogged(
|
||||
"6 ticket(s) in 'test-jail1",
|
||||
"[test-jail1] 192.0.2.1 already banned", all=True, wait=MID_WAITTIME)
|
||||
# test "failure" regexp still available:
|
||||
self.assertLogged(
|
||||
"[test-jail1] Found 192.0.2.1",
|
||||
|
@ -953,8 +940,7 @@ class Fail2banServerTest(Fail2banClientServerBase):
|
|||
self.pruneLog("[test-phase 7]")
|
||||
self.execSuccess(startparams,
|
||||
"reload", "--unban")
|
||||
self.assertTrue(
|
||||
Utils.wait_for(lambda: self._is_logged("Reload finished."), MID_WAITTIME))
|
||||
self.assertLogged("Reload finished.", all=True, wait=MID_WAITTIME)
|
||||
# reloads unbanned all:
|
||||
self.assertLogged(
|
||||
"Jail 'test-jail1' reloaded",
|
||||
|
|
|
@ -589,8 +589,21 @@ class LogCaptureTestCase(unittest.TestCase):
|
|||
logSys.handlers = self._old_handlers
|
||||
logSys.level = self._old_level
|
||||
|
||||
def _is_logged(self, s):
|
||||
return s in self._log.getvalue()
|
||||
def _is_logged(self, *s, **kwargs):
|
||||
logged = self._log.getvalue()
|
||||
if not kwargs.get('all', False):
|
||||
# at least one entry should be found:
|
||||
for s_ in s:
|
||||
if s_ in logged:
|
||||
return True
|
||||
if True: # pragma: no cover
|
||||
return False
|
||||
else:
|
||||
# each entry should be found:
|
||||
for s_ in s:
|
||||
if s_ not in logged: # pragma: no cover
|
||||
return False
|
||||
return True
|
||||
|
||||
def assertLogged(self, *s, **kwargs):
|
||||
"""Assert that one of the strings was logged
|
||||
|
@ -604,19 +617,23 @@ class LogCaptureTestCase(unittest.TestCase):
|
|||
Test should succeed if string (or any of the listed) is present in the log
|
||||
all : boolean (default False) if True should fail if any of s not logged
|
||||
"""
|
||||
logged = self._log.getvalue()
|
||||
wait = kwargs.get('wait', None)
|
||||
if wait:
|
||||
res = Utils.wait_for(lambda: self._is_logged(*s, **kwargs), wait)
|
||||
else:
|
||||
res = self._is_logged(*s, **kwargs)
|
||||
if not kwargs.get('all', False):
|
||||
# at least one entry should be found:
|
||||
for s_ in s:
|
||||
if s_ in logged:
|
||||
return
|
||||
if True: # pragma: no cover
|
||||
if not res: # pragma: no cover
|
||||
logged = self._log.getvalue()
|
||||
self.fail("None among %r was found in the log: ===\n%s===" % (s, logged))
|
||||
else:
|
||||
# each entry should be found:
|
||||
for s_ in s:
|
||||
if s_ not in logged: # pragma: no cover
|
||||
self.fail("%r was not found in the log: ===\n%s===" % (s_, logged))
|
||||
if not res: # pragma: no cover
|
||||
logged = self._log.getvalue()
|
||||
for s_ in s:
|
||||
if s_ not in logged:
|
||||
self.fail("%r was not found in the log: ===\n%s===" % (s_, logged))
|
||||
|
||||
def assertNotLogged(self, *s, **kwargs):
|
||||
"""Assert that strings were not logged
|
||||
|
|
Loading…
Reference in New Issue