Merge pull request #1188 from yarikoptic/rf-assertLogged

RF(TST): self.assertTrue(self._is_logged()) -> self.assertLogged
pull/1191/head
Yaroslav Halchenko 2015-09-15 09:14:58 -04:00
commit 41edfe8caf
7 changed files with 88 additions and 54 deletions

View File

@ -94,15 +94,15 @@ class ExecuteActions(LogCaptureTestCase):
"Action", os.path.join(TEST_FILES_DIR, "action.d/action.py"), "Action", os.path.join(TEST_FILES_DIR, "action.d/action.py"),
{'opt1': 'value'}) {'opt1': 'value'})
self.assertTrue(self._is_logged("TestAction initialised")) self.assertLogged("TestAction initialised")
self.__actions.start() self.__actions.start()
time.sleep(3) time.sleep(3)
self.assertTrue(self._is_logged("TestAction action start")) self.assertLogged("TestAction action start")
self.__actions.stop() self.__actions.stop()
self.__actions.join() self.__actions.join()
self.assertTrue(self._is_logged("TestAction action stop")) self.assertLogged("TestAction action stop")
self.assertRaises(IOError, self.assertRaises(IOError,
self.__actions.add, "Action3", "/does/not/exist.py", {}) self.__actions.add, "Action3", "/does/not/exist.py", {})
@ -136,10 +136,10 @@ class ExecuteActions(LogCaptureTestCase):
{}) {})
self.__actions.start() self.__actions.start()
time.sleep(3) time.sleep(3)
self.assertTrue(self._is_logged("Failed to start")) self.assertLogged("Failed to start")
self.__actions.stop() self.__actions.stop()
self.__actions.join() self.__actions.join()
self.assertTrue(self._is_logged("Failed to stop")) self.assertLogged("Failed to stop")
def testBanActionsAInfo(self): def testBanActionsAInfo(self):
# Action which deletes IP address from aInfo # Action which deletes IP address from aInfo
@ -155,13 +155,13 @@ class ExecuteActions(LogCaptureTestCase):
self.__actions._Actions__checkBan() self.__actions._Actions__checkBan()
# Will fail if modification of aInfo from first action propagates # Will fail if modification of aInfo from first action propagates
# to second action, as both delete same key # to second action, as both delete same key
self.assertFalse(self._is_logged("Failed to execute ban")) self.assertNotLogged("Failed to execute ban")
self.assertTrue(self._is_logged("action1 ban deleted aInfo IP")) self.assertLogged("action1 ban deleted aInfo IP")
self.assertTrue(self._is_logged("action2 ban deleted aInfo IP")) self.assertLogged("action2 ban deleted aInfo IP")
self.__actions._Actions__flushBan() self.__actions._Actions__flushBan()
# Will fail if modification of aInfo from first action propagates # Will fail if modification of aInfo from first action propagates
# to second action, as both delete same key # to second action, as both delete same key
self.assertFalse(self._is_logged("Failed to execute unban")) self.assertNotLogged("Failed to execute unban")
self.assertTrue(self._is_logged("action1 unban deleted aInfo IP")) self.assertLogged("action1 unban deleted aInfo IP")
self.assertTrue(self._is_logged("action2 unban deleted aInfo IP")) self.assertLogged("action2 unban deleted aInfo IP")

View File

@ -143,17 +143,17 @@ class CommandActionTest(LogCaptureTestCase):
self.__action.actionunban = "true" self.__action.actionunban = "true"
self.assertEqual(self.__action.actionunban, 'true') self.assertEqual(self.__action.actionunban, 'true')
self.assertFalse(self._is_logged('returned')) self.assertNotLogged('returned')
# no action was actually executed yet # no action was actually executed yet
self.__action.ban({'ip': None}) self.__action.ban({'ip': None})
self.assertTrue(self._is_logged('Invariant check failed')) self.assertLogged('Invariant check failed')
self.assertTrue(self._is_logged('returned successfully')) self.assertLogged('returned successfully')
def testExecuteActionEmptyUnban(self): def testExecuteActionEmptyUnban(self):
self.__action.actionunban = "" self.__action.actionunban = ""
self.__action.unban({}) self.__action.unban({})
self.assertTrue(self._is_logged('Nothing to do')) self.assertLogged('Nothing to do')
def testExecuteActionStartCtags(self): def testExecuteActionStartCtags(self):
self.__action.HOST = "192.0.2.0" self.__action.HOST = "192.0.2.0"
@ -168,7 +168,7 @@ class CommandActionTest(LogCaptureTestCase):
self.__action.actionban = "rm /tmp/fail2ban.test" self.__action.actionban = "rm /tmp/fail2ban.test"
self.__action.actioncheck = "[ -e /tmp/fail2ban.test ]" self.__action.actioncheck = "[ -e /tmp/fail2ban.test ]"
self.assertRaises(RuntimeError, self.__action.ban, {'ip': None}) self.assertRaises(RuntimeError, self.__action.ban, {'ip': None})
self.assertTrue(self._is_logged('Unable to restore environment')) self.assertLogged('Unable to restore environment')
def testExecuteActionChangeCtags(self): def testExecuteActionChangeCtags(self):
self.assertRaises(AttributeError, getattr, self.__action, "ROST") self.assertRaises(AttributeError, getattr, self.__action, "ROST")
@ -187,11 +187,11 @@ class CommandActionTest(LogCaptureTestCase):
def testExecuteActionStartEmpty(self): def testExecuteActionStartEmpty(self):
self.__action.actionstart = "" self.__action.actionstart = ""
self.__action.start() self.__action.start()
self.assertTrue(self._is_logged('Nothing to do')) self.assertLogged('Nothing to do')
def testExecuteIncorrectCmd(self): def testExecuteIncorrectCmd(self):
CommandAction.executeCmd('/bin/ls >/dev/null\nbogusXXX now 2>/dev/null') CommandAction.executeCmd('/bin/ls >/dev/null\nbogusXXX now 2>/dev/null')
self.assertTrue(self._is_logged('HINT on 127: "Command not found"')) self.assertLogged('HINT on 127: "Command not found"')
def testExecuteTimeout(self): def testExecuteTimeout(self):
stime = time.time() stime = time.time()
@ -199,9 +199,11 @@ class CommandActionTest(LogCaptureTestCase):
self.assertFalse(CommandAction.executeCmd('sleep 60', timeout=2)) self.assertFalse(CommandAction.executeCmd('sleep 60', timeout=2))
# give a test still 1 second, because system could be too busy # give a test still 1 second, because system could be too busy
self.assertTrue(time.time() >= stime + 2 and time.time() <= stime + 3) self.assertTrue(time.time() >= stime + 2 and time.time() <= stime + 3)
self.assertTrue(self._is_logged('sleep 60 -- timed out after 2 seconds') self.assertLogged(
or self._is_logged('sleep 60 -- timed out after 3 seconds')) 'sleep 60 -- timed out after 2 seconds',
self.assertTrue(self._is_logged('sleep 60 -- killed with SIGTERM')) 'sleep 60 -- timed out after 3 seconds'
)
self.assertLogged('sleep 60 -- killed with SIGTERM')
def testExecuteTimeoutWithNastyChildren(self): def testExecuteTimeoutWithNastyChildren(self):
# temporary file for a nasty kid shell script # temporary file for a nasty kid shell script
@ -225,16 +227,16 @@ class CommandActionTest(LogCaptureTestCase):
'bash %s' % tmpFilename, timeout=.1)) 'bash %s' % tmpFilename, timeout=.1))
# Verify that the process itself got killed # Verify that the process itself got killed
self.assertFalse(pid_exists(getnastypid())) # process should have been killed self.assertFalse(pid_exists(getnastypid())) # process should have been killed
self.assertTrue(self._is_logged('timed out')) self.assertLogged('timed out')
self.assertTrue(self._is_logged('killed with SIGTERM')) self.assertLogged('killed with SIGTERM')
# A bit evolved case even though, previous test already tests killing children processes # A bit evolved case even though, previous test already tests killing children processes
self.assertFalse(CommandAction.executeCmd( self.assertFalse(CommandAction.executeCmd(
'out=`bash %s`; echo ALRIGHT' % tmpFilename, timeout=.2)) 'out=`bash %s`; echo ALRIGHT' % tmpFilename, timeout=.2))
# Verify that the process itself got killed # Verify that the process itself got killed
self.assertFalse(pid_exists(getnastypid())) self.assertFalse(pid_exists(getnastypid()))
self.assertTrue(self._is_logged('timed out')) self.assertLogged('timed out')
self.assertTrue(self._is_logged('killed with SIGTERM')) self.assertLogged('killed with SIGTERM')
os.unlink(tmpFilename) os.unlink(tmpFilename)
os.unlink(tmpFilename + '.pid') os.unlink(tmpFilename + '.pid')
@ -242,11 +244,11 @@ class CommandActionTest(LogCaptureTestCase):
def testCaptureStdOutErr(self): def testCaptureStdOutErr(self):
CommandAction.executeCmd('echo "How now brown cow"') CommandAction.executeCmd('echo "How now brown cow"')
self.assertTrue(self._is_logged("'How now brown cow\\n'")) self.assertLogged("'How now brown cow\\n'")
CommandAction.executeCmd( CommandAction.executeCmd(
'echo "The rain in Spain stays mainly in the plain" 1>&2') 'echo "The rain in Spain stays mainly in the plain" 1>&2')
self.assertTrue(self._is_logged( self.assertLogged(
"'The rain in Spain stays mainly in the plain\\n'")) "'The rain in Spain stays mainly in the plain\\n'")
def testCallingMap(self): def testCallingMap(self):
mymap = CallingMap(callme=lambda: str(10), error=lambda: int('a'), mymap = CallingMap(callme=lambda: str(10), error=lambda: int('a'),

View File

@ -173,16 +173,16 @@ class JailReaderTest(LogCaptureTestCase):
self.assertTrue(jail.read()) self.assertTrue(jail.read())
self.assertTrue(jail.getOptions()) self.assertTrue(jail.getOptions())
self.assertTrue(jail.isEnabled()) self.assertTrue(jail.isEnabled())
self.assertTrue(self._is_logged('No filter set for jail emptyaction')) self.assertLogged('No filter set for jail emptyaction')
self.assertTrue(self._is_logged('No actions were defined for emptyaction')) self.assertLogged('No actions were defined for emptyaction')
def testJailActionFilterMissing(self): def testJailActionFilterMissing(self):
jail = JailReader('missingbitsjail', basedir=IMPERFECT_CONFIG, share_config = self.__share_cfg) jail = JailReader('missingbitsjail', basedir=IMPERFECT_CONFIG, share_config = self.__share_cfg)
self.assertTrue(jail.read()) self.assertTrue(jail.read())
self.assertFalse(jail.getOptions()) self.assertFalse(jail.getOptions())
self.assertTrue(jail.isEnabled()) self.assertTrue(jail.isEnabled())
self.assertTrue(self._is_logged("Found no accessible config files for 'filter.d/catchallthebadies' under %s" % IMPERFECT_CONFIG)) self.assertLogged("Found no accessible config files for 'filter.d/catchallthebadies' under %s" % IMPERFECT_CONFIG)
self.assertTrue(self._is_logged('Unable to read the filter')) self.assertLogged('Unable to read the filter')
def testJailActionBrokenDef(self): def testJailActionBrokenDef(self):
jail = JailReader('brokenactiondef', basedir=IMPERFECT_CONFIG, jail = JailReader('brokenactiondef', basedir=IMPERFECT_CONFIG,
@ -190,13 +190,13 @@ class JailReaderTest(LogCaptureTestCase):
self.assertTrue(jail.read()) self.assertTrue(jail.read())
self.assertFalse(jail.getOptions()) self.assertFalse(jail.getOptions())
self.assertTrue(jail.isEnabled()) self.assertTrue(jail.isEnabled())
self.assertTrue(self._is_logged('Error in action definition joho[foo')) self.assertLogged('Error in action definition joho[foo')
# This unittest has been deactivated for some time... # This unittest has been deactivated for some time...
# self.assertTrue(self._is_logged( # self.assertLogged(
# 'Caught exception: While reading action joho[foo we should have got 1 or 2 groups. Got: 0')) # 'Caught exception: While reading action joho[foo we should have got 1 or 2 groups. Got: 0')
# let's test for what is actually logged and handle changes in the future # let's test for what is actually logged and handle changes in the future
self.assertTrue(self._is_logged( self.assertLogged(
"Caught exception: 'NoneType' object has no attribute 'endswith'")) "Caught exception: 'NoneType' object has no attribute 'endswith'")
if STOCK: if STOCK:
def testStockSSHJail(self): def testStockSSHJail(self):
@ -221,7 +221,7 @@ class JailReaderTest(LogCaptureTestCase):
self.assertEqual(('mail--ho_is', {}), JailReader.extractOptions("mail--ho_is['s']")) self.assertEqual(('mail--ho_is', {}), JailReader.extractOptions("mail--ho_is['s']"))
#self.printLog() #self.printLog()
#self.assertTrue(self._is_logged("Invalid argument ['s'] in ''s''")) #self.assertLogged("Invalid argument ['s'] in ''s''")
self.assertEqual(('mail', {'a': ','}), JailReader.extractOptions("mail[a=',']")) self.assertEqual(('mail', {'a': ','}), JailReader.extractOptions("mail[a=',']"))
@ -265,7 +265,7 @@ class JailReaderTest(LogCaptureTestCase):
self.assertEqual(JailReader._glob(os.path.join(d, '*')), [f1]) self.assertEqual(JailReader._glob(os.path.join(d, '*')), [f1])
# since f2 is dangling -- empty list # since f2 is dangling -- empty list
self.assertEqual(JailReader._glob(f2), []) self.assertEqual(JailReader._glob(f2), [])
self.assertTrue(self._is_logged('File %s is a dangling link, thus cannot be monitored' % f2)) self.assertLogged('File %s is a dangling link, thus cannot be monitored' % f2)
self.assertEqual(JailReader._glob(os.path.join(d, 'nonexisting')), []) self.assertEqual(JailReader._glob(os.path.join(d, 'nonexisting')), [])
os.remove(f1) os.remove(f1)
os.remove(f2) os.remove(f2)
@ -463,8 +463,8 @@ class JailsReaderTest(LogCaptureTestCase):
['start', 'missinglogfiles'], ['start', 'missinglogfiles'],
['start', 'brokenaction'], ['start', 'brokenaction'],
['start', 'parse_to_end_of_jail.conf'],])) ['start', 'parse_to_end_of_jail.conf'],]))
self.assertTrue(self._is_logged("Errors in jail 'missingbitsjail'. Skipping...")) self.assertLogged("Errors in jail 'missingbitsjail'. Skipping...")
self.assertTrue(self._is_logged("No file(s) found for glob /weapons/of/mass/destruction")) self.assertLogged("No file(s) found for glob /weapons/of/mass/destruction")
if STOCK: if STOCK:
def testReadStockActionConf(self): def testReadStockActionConf(self):
@ -496,7 +496,7 @@ class JailsReaderTest(LogCaptureTestCase):
#old_comm_commands = comm_commands[:] # make a copy #old_comm_commands = comm_commands[:] # make a copy
#self.assertRaises(ValueError, jails.getOptions, "BOGUS") #self.assertRaises(ValueError, jails.getOptions, "BOGUS")
#self.printLog() #self.printLog()
#self.assertTrue(self._is_logged("No section: 'BOGUS'")) #self.assertLogged("No section: 'BOGUS'")
## and there should be no side-effects ## and there should be no side-effects
#self.assertEqual(jails.convert(), old_comm_commands) #self.assertEqual(jails.convert(), old_comm_commands)

View File

@ -316,7 +316,7 @@ class DatabaseTest(LogCaptureTestCase):
ticket.setAttempt(5) ticket.setAttempt(5)
self.jail.putFailTicket(ticket) self.jail.putFailTicket(ticket)
actions._Actions__checkBan() actions._Actions__checkBan()
self.assertTrue(self._is_logged("ban ainfo %s, %s, %s, %s" % (True, True, True, True))) self.assertLogged("ban ainfo %s, %s, %s, %s" % (True, True, True, True))
def testPurge(self): def testPurge(self):
if Fail2BanDb is None: # pragma: no cover if Fail2BanDb is None: # pragma: no cover

View File

@ -260,14 +260,14 @@ class IgnoreIP(LogCaptureTestCase):
self.filter.addIgnoreIP('192.168.1.0/25') self.filter.addIgnoreIP('192.168.1.0/25')
self.filter.addFailRegex('<HOST>') self.filter.addFailRegex('<HOST>')
self.filter.processLineAndAdd('1387203300.222 192.168.1.32') self.filter.processLineAndAdd('1387203300.222 192.168.1.32')
self.assertTrue(self._is_logged('Ignore 192.168.1.32')) self.assertLogged('Ignore 192.168.1.32')
tearDownMyTime() tearDownMyTime()
def testIgnoreAddBannedIP(self): def testIgnoreAddBannedIP(self):
self.filter.addIgnoreIP('192.168.1.0/25') self.filter.addIgnoreIP('192.168.1.0/25')
self.filter.addBannedIP('192.168.1.32') self.filter.addBannedIP('192.168.1.32')
self.assertFalse(self._is_logged('Ignore 192.168.1.32')) self.assertNotLogged('Ignore 192.168.1.32')
self.assertTrue(self._is_logged('Requested to manually ban an ignored IP 192.168.1.32. User knows best. Proceeding to ban it.')) self.assertLogged('Requested to manually ban an ignored IP 192.168.1.32. User knows best. Proceeding to ban it.')
def testIgnoreCommand(self): def testIgnoreCommand(self):
self.filter.setIgnoreCommand(sys.executable + ' ' + os.path.join(TEST_FILES_DIR, "ignorecommand.py <ip>")) self.filter.setIgnoreCommand(sys.executable + ' ' + os.path.join(TEST_FILES_DIR, "ignorecommand.py <ip>"))
@ -278,11 +278,11 @@ class IgnoreIP(LogCaptureTestCase):
ip = "93.184.216.34" ip = "93.184.216.34"
for ignore_source in ["dns", "ip", "command"]: for ignore_source in ["dns", "ip", "command"]:
self.filter.logIgnoreIp(ip, True, ignore_source=ignore_source) self.filter.logIgnoreIp(ip, True, ignore_source=ignore_source)
self.assertTrue(self._is_logged("[%s] Ignore %s by %s" % (self.jail.name, ip, ignore_source))) self.assertLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, ignore_source))
def testIgnoreCauseNOK(self): def testIgnoreCauseNOK(self):
self.filter.logIgnoreIp("example.com", False, ignore_source="NOT_LOGGED") self.filter.logIgnoreIp("example.com", False, ignore_source="NOT_LOGGED")
self.assertFalse(self._is_logged("[%s] Ignore %s by %s" % (self.jail.name, "example.com", "NOT_LOGGED"))) self.assertNotLogged("[%s] Ignore %s by %s" % (self.jail.name, "example.com", "NOT_LOGGED"))
class IgnoreIPDNS(IgnoreIP): class IgnoreIPDNS(IgnoreIP):
@ -382,18 +382,17 @@ class LogFileMonitor(LogCaptureTestCase):
def testNoLogFile(self): def testNoLogFile(self):
_killfile(self.file, self.name) _killfile(self.file, self.name)
self.filter.getFailures(self.name) self.filter.getFailures(self.name)
failure_was_logged = self._is_logged('Unable to open %s' % self.name) self.assertLogged('Unable to open %s' % self.name)
self.assertTrue(failure_was_logged)
def testRemovingFailRegex(self): def testRemovingFailRegex(self):
self.filter.delFailRegex(0) self.filter.delFailRegex(0)
self.assertFalse(self._is_logged('Cannot remove regular expression. Index 0 is not valid')) self.assertNotLogged('Cannot remove regular expression. Index 0 is not valid')
self.filter.delFailRegex(0) self.filter.delFailRegex(0)
self.assertTrue(self._is_logged('Cannot remove regular expression. Index 0 is not valid')) self.assertLogged('Cannot remove regular expression. Index 0 is not valid')
def testRemovingIgnoreRegex(self): def testRemovingIgnoreRegex(self):
self.filter.delIgnoreRegex(0) self.filter.delIgnoreRegex(0)
self.assertTrue(self._is_logged('Cannot remove regular expression. Index 0 is not valid')) self.assertLogged('Cannot remove regular expression. Index 0 is not valid')
def testNewChangeViaIsModified(self): def testNewChangeViaIsModified(self):
# it is a brand new one -- so first we think it is modified # it is a brand new one -- so first we think it is modified

View File

@ -934,7 +934,7 @@ class LoggingTests(LogCaptureTestCase):
badThread = _BadThread() badThread = _BadThread()
badThread.start() badThread.start()
badThread.join() badThread.join()
self.assertTrue(self._is_logged("Unhandled exception")) self.assertLogged("Unhandled exception")
finally: finally:
sys.__excepthook__ = prev_exchook sys.__excepthook__ = prev_exchook
self.assertEqual(len(x), 1) self.assertEqual(len(x), 1)

View File

@ -232,6 +232,39 @@ class LogCaptureTestCase(unittest.TestCase):
def _is_logged(self, s): def _is_logged(self, s):
return s in self._log.getvalue() return s in self._log.getvalue()
def assertLogged(self, *s):
"""Assert that one of the strings was logged
Preferable to assertTrue(self._is_logged(..)))
since provides message with the actual log.
Parameters
----------
s : string or list/set/tuple of strings
Test should succeed if string (or any of the listed) is present in the log
"""
logged = self._log.getvalue()
for s_ in s:
if s_ in logged:
return
raise AssertionError("None among %r was found in the log: %r" % (s, logged))
def assertNotLogged(self, *s):
"""Assert that strings were not logged
Parameters
----------
s : string or list/set/tuple of strings
Test should succeed if the string (or at least one of the listed) is not
present in the log
"""
logged = self._log.getvalue()
for s_ in s:
if s_ not in logged:
return
raise AssertionError("All of the %r were found present in the log: %r" % (s, logged))
def getLog(self): def getLog(self):
return self._log.getvalue() return self._log.getvalue()