RF(TST): self.assertTrue(self._is_logged()) -> self.assertLogged

and corresponding one for not + support for multiple entries at once,
and failure message listing actual log to ease troubleshooting
pull/1188/head^2
Yaroslav Halchenko 2015-09-12 19:59:55 -04:00
parent 5b655639ab
commit 5ed731d3b3
7 changed files with 91 additions and 54 deletions

View File

@ -94,15 +94,15 @@ class ExecuteActions(LogCaptureTestCase):
"Action", os.path.join(TEST_FILES_DIR, "action.d/action.py"),
{'opt1': 'value'})
self.assertTrue(self._is_logged("TestAction initialised"))
self.assertLogged("TestAction initialised")
self.__actions.start()
time.sleep(3)
self.assertTrue(self._is_logged("TestAction action start"))
self.assertLogged("TestAction action start")
self.__actions.stop()
self.__actions.join()
self.assertTrue(self._is_logged("TestAction action stop"))
self.assertLogged("TestAction action stop")
self.assertRaises(IOError,
self.__actions.add, "Action3", "/does/not/exist.py", {})
@ -136,10 +136,10 @@ class ExecuteActions(LogCaptureTestCase):
{})
self.__actions.start()
time.sleep(3)
self.assertTrue(self._is_logged("Failed to start"))
self.assertLogged("Failed to start")
self.__actions.stop()
self.__actions.join()
self.assertTrue(self._is_logged("Failed to stop"))
self.assertLogged("Failed to stop")
def testBanActionsAInfo(self):
# Action which deletes IP address from aInfo
@ -155,13 +155,13 @@ class ExecuteActions(LogCaptureTestCase):
self.__actions._Actions__checkBan()
# Will fail if modification of aInfo from first action propagates
# to second action, as both delete same key
self.assertFalse(self._is_logged("Failed to execute ban"))
self.assertTrue(self._is_logged("action1 ban deleted aInfo IP"))
self.assertTrue(self._is_logged("action2 ban deleted aInfo IP"))
self.assertNotLogged("Failed to execute ban")
self.assertLogged("action1 ban deleted aInfo IP")
self.assertLogged("action2 ban deleted aInfo IP")
self.__actions._Actions__flushBan()
# Will fail if modification of aInfo from first action propagates
# to second action, as both delete same key
self.assertFalse(self._is_logged("Failed to execute unban"))
self.assertTrue(self._is_logged("action1 unban deleted aInfo IP"))
self.assertTrue(self._is_logged("action2 unban deleted aInfo IP"))
self.assertNotLogged("Failed to execute unban")
self.assertLogged("action1 unban deleted aInfo IP")
self.assertLogged("action2 unban deleted aInfo IP")

View File

@ -143,17 +143,17 @@ class CommandActionTest(LogCaptureTestCase):
self.__action.actionunban = "true"
self.assertEqual(self.__action.actionunban, 'true')
self.assertFalse(self._is_logged('returned'))
self.assertNotLogged('returned')
# no action was actually executed yet
self.__action.ban({'ip': None})
self.assertTrue(self._is_logged('Invariant check failed'))
self.assertTrue(self._is_logged('returned successfully'))
self.assertLogged('Invariant check failed')
self.assertLogged('returned successfully')
def testExecuteActionEmptyUnban(self):
self.__action.actionunban = ""
self.__action.unban({})
self.assertTrue(self._is_logged('Nothing to do'))
self.assertLogged('Nothing to do')
def testExecuteActionStartCtags(self):
self.__action.HOST = "192.0.2.0"
@ -168,7 +168,7 @@ class CommandActionTest(LogCaptureTestCase):
self.__action.actionban = "rm /tmp/fail2ban.test"
self.__action.actioncheck = "[ -e /tmp/fail2ban.test ]"
self.assertRaises(RuntimeError, self.__action.ban, {'ip': None})
self.assertTrue(self._is_logged('Unable to restore environment'))
self.assertLogged('Unable to restore environment')
def testExecuteActionChangeCtags(self):
self.assertRaises(AttributeError, getattr, self.__action, "ROST")
@ -187,11 +187,11 @@ class CommandActionTest(LogCaptureTestCase):
def testExecuteActionStartEmpty(self):
self.__action.actionstart = ""
self.__action.start()
self.assertTrue(self._is_logged('Nothing to do'))
self.assertLogged('Nothing to do')
def testExecuteIncorrectCmd(self):
CommandAction.executeCmd('/bin/ls >/dev/null\nbogusXXX now 2>/dev/null')
self.assertTrue(self._is_logged('HINT on 127: "Command not found"'))
self.assertLogged('HINT on 127: "Command not found"')
def testExecuteTimeout(self):
stime = time.time()
@ -200,9 +200,11 @@ class CommandActionTest(LogCaptureTestCase):
RuntimeError, CommandAction.executeCmd, 'sleep 60', timeout=2)
# give a test still 1 second, because system could be too busy
self.assertTrue(time.time() >= stime + 2 and time.time() <= stime + 3)
self.assertTrue(self._is_logged('sleep 60 -- timed out after 2 seconds')
or self._is_logged('sleep 60 -- timed out after 3 seconds'))
self.assertTrue(self._is_logged('sleep 60 -- killed with SIGTERM'))
self.assertLogged({
'sleep 60 -- timed out after 2 seconds',
'sleep 60 -- timed out after 3 seconds'
})
self.assertLogged('sleep 60 -- killed with SIGTERM')
def testExecuteTimeoutWithNastyChildren(self):
# temporary file for a nasty kid shell script
@ -226,8 +228,8 @@ class CommandActionTest(LogCaptureTestCase):
RuntimeError, CommandAction.executeCmd, 'bash %s' % tmpFilename, timeout=.1)
# Verify that the proccess itself got killed
self.assertFalse(pid_exists(getnastypid())) # process should have been killed
self.assertTrue(self._is_logged('timed out'))
self.assertTrue(self._is_logged('killed with SIGTERM'))
self.assertLogged('timed out')
self.assertLogged('killed with SIGTERM')
# A bit evolved case even though, previous test already tests killing children processes
self.assertRaises(
@ -235,8 +237,8 @@ class CommandActionTest(LogCaptureTestCase):
timeout=.2)
# Verify that the proccess itself got killed
self.assertFalse(pid_exists(getnastypid()))
self.assertTrue(self._is_logged('timed out'))
self.assertTrue(self._is_logged('killed with SIGTERM'))
self.assertLogged('timed out')
self.assertLogged('killed with SIGTERM')
os.unlink(tmpFilename)
os.unlink(tmpFilename + '.pid')
@ -244,11 +246,11 @@ class CommandActionTest(LogCaptureTestCase):
def testCaptureStdOutErr(self):
CommandAction.executeCmd('echo "How now brown cow"')
self.assertTrue(self._is_logged("'How now brown cow\\n'"))
self.assertLogged("'How now brown cow\\n'")
CommandAction.executeCmd(
'echo "The rain in Spain stays mainly in the plain" 1>&2')
self.assertTrue(self._is_logged(
"'The rain in Spain stays mainly in the plain\\n'"))
self.assertLogged(
"'The rain in Spain stays mainly in the plain\\n'")
def testCallingMap(self):
mymap = CallingMap(callme=lambda: str(10), error=lambda: int('a'),

View File

@ -173,16 +173,16 @@ class JailReaderTest(LogCaptureTestCase):
self.assertTrue(jail.read())
self.assertTrue(jail.getOptions())
self.assertTrue(jail.isEnabled())
self.assertTrue(self._is_logged('No filter set for jail emptyaction'))
self.assertTrue(self._is_logged('No actions were defined for emptyaction'))
self.assertLogged('No filter set for jail emptyaction')
self.assertLogged('No actions were defined for emptyaction')
def testJailActionFilterMissing(self):
jail = JailReader('missingbitsjail', basedir=IMPERFECT_CONFIG, share_config = self.__share_cfg)
self.assertTrue(jail.read())
self.assertFalse(jail.getOptions())
self.assertTrue(jail.isEnabled())
self.assertTrue(self._is_logged("Found no accessible config files for 'filter.d/catchallthebadies' under %s" % IMPERFECT_CONFIG))
self.assertTrue(self._is_logged('Unable to read the filter'))
self.assertLogged("Found no accessible config files for 'filter.d/catchallthebadies' under %s" % IMPERFECT_CONFIG)
self.assertLogged('Unable to read the filter')
def testJailActionBrokenDef(self):
jail = JailReader('brokenactiondef', basedir=IMPERFECT_CONFIG,
@ -190,13 +190,13 @@ class JailReaderTest(LogCaptureTestCase):
self.assertTrue(jail.read())
self.assertFalse(jail.getOptions())
self.assertTrue(jail.isEnabled())
self.assertTrue(self._is_logged('Error in action definition joho[foo'))
self.assertLogged('Error in action definition joho[foo')
# This unittest has been deactivated for some time...
# self.assertTrue(self._is_logged(
# 'Caught exception: While reading action joho[foo we should have got 1 or 2 groups. Got: 0'))
# self.assertLogged(
# 'Caught exception: While reading action joho[foo we should have got 1 or 2 groups. Got: 0')
# let's test for what is actually logged and handle changes in the future
self.assertTrue(self._is_logged(
"Caught exception: 'NoneType' object has no attribute 'endswith'"))
self.assertLogged(
"Caught exception: 'NoneType' object has no attribute 'endswith'")
if STOCK:
def testStockSSHJail(self):
@ -221,7 +221,7 @@ class JailReaderTest(LogCaptureTestCase):
self.assertEqual(('mail--ho_is', {}), JailReader.extractOptions("mail--ho_is['s']"))
#self.printLog()
#self.assertTrue(self._is_logged("Invalid argument ['s'] in ''s''"))
#self.assertLogged("Invalid argument ['s'] in ''s''")
self.assertEqual(('mail', {'a': ','}), JailReader.extractOptions("mail[a=',']"))
@ -265,7 +265,7 @@ class JailReaderTest(LogCaptureTestCase):
self.assertEqual(JailReader._glob(os.path.join(d, '*')), [f1])
# since f2 is dangling -- empty list
self.assertEqual(JailReader._glob(f2), [])
self.assertTrue(self._is_logged('File %s is a dangling link, thus cannot be monitored' % f2))
self.assertLogged('File %s is a dangling link, thus cannot be monitored' % f2)
self.assertEqual(JailReader._glob(os.path.join(d, 'nonexisting')), [])
os.remove(f1)
os.remove(f2)
@ -463,8 +463,8 @@ class JailsReaderTest(LogCaptureTestCase):
['start', 'missinglogfiles'],
['start', 'brokenaction'],
['start', 'parse_to_end_of_jail.conf'],]))
self.assertTrue(self._is_logged("Errors in jail 'missingbitsjail'. Skipping..."))
self.assertTrue(self._is_logged("No file(s) found for glob /weapons/of/mass/destruction"))
self.assertLogged("Errors in jail 'missingbitsjail'. Skipping...")
self.assertLogged("No file(s) found for glob /weapons/of/mass/destruction")
if STOCK:
def testReadStockActionConf(self):
@ -496,7 +496,7 @@ class JailsReaderTest(LogCaptureTestCase):
#old_comm_commands = comm_commands[:] # make a copy
#self.assertRaises(ValueError, jails.getOptions, "BOGUS")
#self.printLog()
#self.assertTrue(self._is_logged("No section: 'BOGUS'"))
#self.assertLogged("No section: 'BOGUS'")
## and there should be no side-effects
#self.assertEqual(jails.convert(), old_comm_commands)

View File

@ -316,7 +316,7 @@ class DatabaseTest(LogCaptureTestCase):
ticket.setAttempt(5)
self.jail.putFailTicket(ticket)
actions._Actions__checkBan()
self.assertTrue(self._is_logged("ban ainfo %s, %s, %s, %s" % (True, True, True, True)))
self.assertLogged("ban ainfo %s, %s, %s, %s" % (True, True, True, True))
def testPurge(self):
if Fail2BanDb is None: # pragma: no cover

View File

@ -260,14 +260,14 @@ class IgnoreIP(LogCaptureTestCase):
self.filter.addIgnoreIP('192.168.1.0/25')
self.filter.addFailRegex('<HOST>')
self.filter.processLineAndAdd('1387203300.222 192.168.1.32')
self.assertTrue(self._is_logged('Ignore 192.168.1.32'))
self.assertLogged('Ignore 192.168.1.32')
tearDownMyTime()
def testIgnoreAddBannedIP(self):
self.filter.addIgnoreIP('192.168.1.0/25')
self.filter.addBannedIP('192.168.1.32')
self.assertFalse(self._is_logged('Ignore 192.168.1.32'))
self.assertTrue(self._is_logged('Requested to manually ban an ignored IP 192.168.1.32. User knows best. Proceeding to ban it.'))
self.assertNotLogged('Ignore 192.168.1.32')
self.assertLogged('Requested to manually ban an ignored IP 192.168.1.32. User knows best. Proceeding to ban it.')
def testIgnoreCommand(self):
self.filter.setIgnoreCommand(sys.executable + ' ' + os.path.join(TEST_FILES_DIR, "ignorecommand.py <ip>"))
@ -278,11 +278,11 @@ class IgnoreIP(LogCaptureTestCase):
ip = "93.184.216.34"
for ignore_source in ["dns", "ip", "command"]:
self.filter.logIgnoreIp(ip, True, ignore_source=ignore_source)
self.assertTrue(self._is_logged("[%s] Ignore %s by %s" % (self.jail.name, ip, ignore_source)))
self.assertLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, ignore_source))
def testIgnoreCauseNOK(self):
self.filter.logIgnoreIp("example.com", False, ignore_source="NOT_LOGGED")
self.assertFalse(self._is_logged("[%s] Ignore %s by %s" % (self.jail.name, "example.com", "NOT_LOGGED")))
self.assertNotLogged("[%s] Ignore %s by %s" % (self.jail.name, "example.com", "NOT_LOGGED"))
class IgnoreIPDNS(IgnoreIP):
@ -382,18 +382,17 @@ class LogFileMonitor(LogCaptureTestCase):
def testNoLogFile(self):
_killfile(self.file, self.name)
self.filter.getFailures(self.name)
failure_was_logged = self._is_logged('Unable to open %s' % self.name)
self.assertTrue(failure_was_logged)
self.assertLogged('Unable to open %s' % self.name)
def testRemovingFailRegex(self):
self.filter.delFailRegex(0)
self.assertFalse(self._is_logged('Cannot remove regular expression. Index 0 is not valid'))
self.assertNotLogged('Cannot remove regular expression. Index 0 is not valid')
self.filter.delFailRegex(0)
self.assertTrue(self._is_logged('Cannot remove regular expression. Index 0 is not valid'))
self.assertLogged('Cannot remove regular expression. Index 0 is not valid')
def testRemovingIgnoreRegex(self):
self.filter.delIgnoreRegex(0)
self.assertTrue(self._is_logged('Cannot remove regular expression. Index 0 is not valid'))
self.assertLogged('Cannot remove regular expression. Index 0 is not valid')
def testNewChangeViaIsModified(self):
# it is a brand new one -- so first we think it is modified

View File

@ -934,7 +934,7 @@ class LoggingTests(LogCaptureTestCase):
badThread = _BadThread()
badThread.start()
badThread.join()
self.assertTrue(self._is_logged("Unhandled exception"))
self.assertLogged("Unhandled exception")
finally:
sys.__excepthook__ = prev_exchook
self.assertEqual(len(x), 1)

View File

@ -232,6 +232,42 @@ class LogCaptureTestCase(unittest.TestCase):
def _is_logged(self, s):
return s in self._log.getvalue()
def assertLogged(self, s):
"""Assert that a string was logged
Preferable to assertTrue(self._is_logged(..)))
since provides message with the actual log.
Parameters
----------
s : string or list/set/tuple of strings
Test should succeed if string (or any of the listed) is present in the log
"""
logged = self._log.getvalue()
s_iter = s if isinstance(s, (tuple, list, set)) else [s]
for s_ in s_iter:
if s_ in logged:
return
raise AssertionError("%r was not found in the log: %r" % (s, logged))
def assertNotLogged(self, s):
"""Assert that a string was not logged
Parameters
----------
s : string or list/set/tuple of strings
Test should succeed if the string (or at least one of the listed) is not present in the log
"""
logged = self._log.getvalue()
s_iter = s if isinstance(s, (tuple, list, set)) else [s]
for s_ in s_iter:
if s_ not in logged:
return
raise AssertionError("%r was found present in the log: %r" % (s, logged))
def getLog(self):
return self._log.getvalue()