diff --git a/fail2ban/tests/actionstestcase.py b/fail2ban/tests/actionstestcase.py index bb295967..0ceb35d5 100644 --- a/fail2ban/tests/actionstestcase.py +++ b/fail2ban/tests/actionstestcase.py @@ -94,15 +94,15 @@ class ExecuteActions(LogCaptureTestCase): "Action", os.path.join(TEST_FILES_DIR, "action.d/action.py"), {'opt1': 'value'}) - self.assertTrue(self._is_logged("TestAction initialised")) + self.assertLogged("TestAction initialised") self.__actions.start() time.sleep(3) - self.assertTrue(self._is_logged("TestAction action start")) + self.assertLogged("TestAction action start") self.__actions.stop() self.__actions.join() - self.assertTrue(self._is_logged("TestAction action stop")) + self.assertLogged("TestAction action stop") self.assertRaises(IOError, self.__actions.add, "Action3", "/does/not/exist.py", {}) @@ -136,10 +136,10 @@ class ExecuteActions(LogCaptureTestCase): {}) self.__actions.start() time.sleep(3) - self.assertTrue(self._is_logged("Failed to start")) + self.assertLogged("Failed to start") self.__actions.stop() self.__actions.join() - self.assertTrue(self._is_logged("Failed to stop")) + self.assertLogged("Failed to stop") def testBanActionsAInfo(self): # Action which deletes IP address from aInfo @@ -155,13 +155,13 @@ class ExecuteActions(LogCaptureTestCase): self.__actions._Actions__checkBan() # Will fail if modification of aInfo from first action propagates # to second action, as both delete same key - self.assertFalse(self._is_logged("Failed to execute ban")) - self.assertTrue(self._is_logged("action1 ban deleted aInfo IP")) - self.assertTrue(self._is_logged("action2 ban deleted aInfo IP")) + self.assertNotLogged("Failed to execute ban") + self.assertLogged("action1 ban deleted aInfo IP") + self.assertLogged("action2 ban deleted aInfo IP") self.__actions._Actions__flushBan() # Will fail if modification of aInfo from first action propagates # to second action, as both delete same key - self.assertFalse(self._is_logged("Failed to execute unban")) - self.assertTrue(self._is_logged("action1 unban deleted aInfo IP")) - self.assertTrue(self._is_logged("action2 unban deleted aInfo IP")) + self.assertNotLogged("Failed to execute unban") + self.assertLogged("action1 unban deleted aInfo IP") + self.assertLogged("action2 unban deleted aInfo IP") diff --git a/fail2ban/tests/actiontestcase.py b/fail2ban/tests/actiontestcase.py index febbc619..a4b453a3 100644 --- a/fail2ban/tests/actiontestcase.py +++ b/fail2ban/tests/actiontestcase.py @@ -143,17 +143,17 @@ class CommandActionTest(LogCaptureTestCase): self.__action.actionunban = "true" self.assertEqual(self.__action.actionunban, 'true') - self.assertFalse(self._is_logged('returned')) + self.assertNotLogged('returned') # no action was actually executed yet self.__action.ban({'ip': None}) - self.assertTrue(self._is_logged('Invariant check failed')) - self.assertTrue(self._is_logged('returned successfully')) + self.assertLogged('Invariant check failed') + self.assertLogged('returned successfully') def testExecuteActionEmptyUnban(self): self.__action.actionunban = "" self.__action.unban({}) - self.assertTrue(self._is_logged('Nothing to do')) + self.assertLogged('Nothing to do') def testExecuteActionStartCtags(self): self.__action.HOST = "192.0.2.0" @@ -168,7 +168,7 @@ class CommandActionTest(LogCaptureTestCase): self.__action.actionban = "rm /tmp/fail2ban.test" self.__action.actioncheck = "[ -e /tmp/fail2ban.test ]" self.assertRaises(RuntimeError, self.__action.ban, {'ip': None}) - self.assertTrue(self._is_logged('Unable to restore environment')) + self.assertLogged('Unable to restore environment') def testExecuteActionChangeCtags(self): self.assertRaises(AttributeError, getattr, self.__action, "ROST") @@ -187,11 +187,11 @@ class CommandActionTest(LogCaptureTestCase): def testExecuteActionStartEmpty(self): self.__action.actionstart = "" self.__action.start() - self.assertTrue(self._is_logged('Nothing to do')) + self.assertLogged('Nothing to do') def testExecuteIncorrectCmd(self): CommandAction.executeCmd('/bin/ls >/dev/null\nbogusXXX now 2>/dev/null') - self.assertTrue(self._is_logged('HINT on 127: "Command not found"')) + self.assertLogged('HINT on 127: "Command not found"') def testExecuteTimeout(self): stime = time.time() @@ -200,9 +200,11 @@ class CommandActionTest(LogCaptureTestCase): RuntimeError, CommandAction.executeCmd, 'sleep 60', timeout=2) # give a test still 1 second, because system could be too busy self.assertTrue(time.time() >= stime + 2 and time.time() <= stime + 3) - self.assertTrue(self._is_logged('sleep 60 -- timed out after 2 seconds') - or self._is_logged('sleep 60 -- timed out after 3 seconds')) - self.assertTrue(self._is_logged('sleep 60 -- killed with SIGTERM')) + self.assertLogged({ + 'sleep 60 -- timed out after 2 seconds', + 'sleep 60 -- timed out after 3 seconds' + }) + self.assertLogged('sleep 60 -- killed with SIGTERM') def testExecuteTimeoutWithNastyChildren(self): # temporary file for a nasty kid shell script @@ -226,8 +228,8 @@ class CommandActionTest(LogCaptureTestCase): RuntimeError, CommandAction.executeCmd, 'bash %s' % tmpFilename, timeout=.1) # Verify that the proccess itself got killed self.assertFalse(pid_exists(getnastypid())) # process should have been killed - self.assertTrue(self._is_logged('timed out')) - self.assertTrue(self._is_logged('killed with SIGTERM')) + self.assertLogged('timed out') + self.assertLogged('killed with SIGTERM') # A bit evolved case even though, previous test already tests killing children processes self.assertRaises( @@ -235,8 +237,8 @@ class CommandActionTest(LogCaptureTestCase): timeout=.2) # Verify that the proccess itself got killed self.assertFalse(pid_exists(getnastypid())) - self.assertTrue(self._is_logged('timed out')) - self.assertTrue(self._is_logged('killed with SIGTERM')) + self.assertLogged('timed out') + self.assertLogged('killed with SIGTERM') os.unlink(tmpFilename) os.unlink(tmpFilename + '.pid') @@ -244,11 +246,11 @@ class CommandActionTest(LogCaptureTestCase): def testCaptureStdOutErr(self): CommandAction.executeCmd('echo "How now brown cow"') - self.assertTrue(self._is_logged("'How now brown cow\\n'")) + self.assertLogged("'How now brown cow\\n'") CommandAction.executeCmd( 'echo "The rain in Spain stays mainly in the plain" 1>&2') - self.assertTrue(self._is_logged( - "'The rain in Spain stays mainly in the plain\\n'")) + self.assertLogged( + "'The rain in Spain stays mainly in the plain\\n'") def testCallingMap(self): mymap = CallingMap(callme=lambda: str(10), error=lambda: int('a'), diff --git a/fail2ban/tests/clientreadertestcase.py b/fail2ban/tests/clientreadertestcase.py index 00128c33..94fe1828 100644 --- a/fail2ban/tests/clientreadertestcase.py +++ b/fail2ban/tests/clientreadertestcase.py @@ -173,16 +173,16 @@ class JailReaderTest(LogCaptureTestCase): self.assertTrue(jail.read()) self.assertTrue(jail.getOptions()) self.assertTrue(jail.isEnabled()) - self.assertTrue(self._is_logged('No filter set for jail emptyaction')) - self.assertTrue(self._is_logged('No actions were defined for emptyaction')) + self.assertLogged('No filter set for jail emptyaction') + self.assertLogged('No actions were defined for emptyaction') def testJailActionFilterMissing(self): jail = JailReader('missingbitsjail', basedir=IMPERFECT_CONFIG, share_config = self.__share_cfg) self.assertTrue(jail.read()) self.assertFalse(jail.getOptions()) self.assertTrue(jail.isEnabled()) - self.assertTrue(self._is_logged("Found no accessible config files for 'filter.d/catchallthebadies' under %s" % IMPERFECT_CONFIG)) - self.assertTrue(self._is_logged('Unable to read the filter')) + self.assertLogged("Found no accessible config files for 'filter.d/catchallthebadies' under %s" % IMPERFECT_CONFIG) + self.assertLogged('Unable to read the filter') def testJailActionBrokenDef(self): jail = JailReader('brokenactiondef', basedir=IMPERFECT_CONFIG, @@ -190,13 +190,13 @@ class JailReaderTest(LogCaptureTestCase): self.assertTrue(jail.read()) self.assertFalse(jail.getOptions()) self.assertTrue(jail.isEnabled()) - self.assertTrue(self._is_logged('Error in action definition joho[foo')) + self.assertLogged('Error in action definition joho[foo') # This unittest has been deactivated for some time... - # self.assertTrue(self._is_logged( - # 'Caught exception: While reading action joho[foo we should have got 1 or 2 groups. Got: 0')) + # self.assertLogged( + # 'Caught exception: While reading action joho[foo we should have got 1 or 2 groups. Got: 0') # let's test for what is actually logged and handle changes in the future - self.assertTrue(self._is_logged( - "Caught exception: 'NoneType' object has no attribute 'endswith'")) + self.assertLogged( + "Caught exception: 'NoneType' object has no attribute 'endswith'") if STOCK: def testStockSSHJail(self): @@ -221,7 +221,7 @@ class JailReaderTest(LogCaptureTestCase): self.assertEqual(('mail--ho_is', {}), JailReader.extractOptions("mail--ho_is['s']")) #self.printLog() - #self.assertTrue(self._is_logged("Invalid argument ['s'] in ''s''")) + #self.assertLogged("Invalid argument ['s'] in ''s''") self.assertEqual(('mail', {'a': ','}), JailReader.extractOptions("mail[a=',']")) @@ -265,7 +265,7 @@ class JailReaderTest(LogCaptureTestCase): self.assertEqual(JailReader._glob(os.path.join(d, '*')), [f1]) # since f2 is dangling -- empty list self.assertEqual(JailReader._glob(f2), []) - self.assertTrue(self._is_logged('File %s is a dangling link, thus cannot be monitored' % f2)) + self.assertLogged('File %s is a dangling link, thus cannot be monitored' % f2) self.assertEqual(JailReader._glob(os.path.join(d, 'nonexisting')), []) os.remove(f1) os.remove(f2) @@ -463,8 +463,8 @@ class JailsReaderTest(LogCaptureTestCase): ['start', 'missinglogfiles'], ['start', 'brokenaction'], ['start', 'parse_to_end_of_jail.conf'],])) - self.assertTrue(self._is_logged("Errors in jail 'missingbitsjail'. Skipping...")) - self.assertTrue(self._is_logged("No file(s) found for glob /weapons/of/mass/destruction")) + self.assertLogged("Errors in jail 'missingbitsjail'. Skipping...") + self.assertLogged("No file(s) found for glob /weapons/of/mass/destruction") if STOCK: def testReadStockActionConf(self): @@ -496,7 +496,7 @@ class JailsReaderTest(LogCaptureTestCase): #old_comm_commands = comm_commands[:] # make a copy #self.assertRaises(ValueError, jails.getOptions, "BOGUS") #self.printLog() - #self.assertTrue(self._is_logged("No section: 'BOGUS'")) + #self.assertLogged("No section: 'BOGUS'") ## and there should be no side-effects #self.assertEqual(jails.convert(), old_comm_commands) diff --git a/fail2ban/tests/databasetestcase.py b/fail2ban/tests/databasetestcase.py index dd813ee6..3d156eda 100644 --- a/fail2ban/tests/databasetestcase.py +++ b/fail2ban/tests/databasetestcase.py @@ -316,7 +316,7 @@ class DatabaseTest(LogCaptureTestCase): ticket.setAttempt(5) self.jail.putFailTicket(ticket) actions._Actions__checkBan() - self.assertTrue(self._is_logged("ban ainfo %s, %s, %s, %s" % (True, True, True, True))) + self.assertLogged("ban ainfo %s, %s, %s, %s" % (True, True, True, True)) def testPurge(self): if Fail2BanDb is None: # pragma: no cover diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index 891b55a6..acce9625 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -260,14 +260,14 @@ class IgnoreIP(LogCaptureTestCase): self.filter.addIgnoreIP('192.168.1.0/25') self.filter.addFailRegex('') self.filter.processLineAndAdd('1387203300.222 192.168.1.32') - self.assertTrue(self._is_logged('Ignore 192.168.1.32')) + self.assertLogged('Ignore 192.168.1.32') tearDownMyTime() def testIgnoreAddBannedIP(self): self.filter.addIgnoreIP('192.168.1.0/25') self.filter.addBannedIP('192.168.1.32') - self.assertFalse(self._is_logged('Ignore 192.168.1.32')) - self.assertTrue(self._is_logged('Requested to manually ban an ignored IP 192.168.1.32. User knows best. Proceeding to ban it.')) + self.assertNotLogged('Ignore 192.168.1.32') + self.assertLogged('Requested to manually ban an ignored IP 192.168.1.32. User knows best. Proceeding to ban it.') def testIgnoreCommand(self): self.filter.setIgnoreCommand(sys.executable + ' ' + os.path.join(TEST_FILES_DIR, "ignorecommand.py ")) @@ -278,11 +278,11 @@ class IgnoreIP(LogCaptureTestCase): ip = "93.184.216.34" for ignore_source in ["dns", "ip", "command"]: self.filter.logIgnoreIp(ip, True, ignore_source=ignore_source) - self.assertTrue(self._is_logged("[%s] Ignore %s by %s" % (self.jail.name, ip, ignore_source))) + self.assertLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, ignore_source)) def testIgnoreCauseNOK(self): self.filter.logIgnoreIp("example.com", False, ignore_source="NOT_LOGGED") - self.assertFalse(self._is_logged("[%s] Ignore %s by %s" % (self.jail.name, "example.com", "NOT_LOGGED"))) + self.assertNotLogged("[%s] Ignore %s by %s" % (self.jail.name, "example.com", "NOT_LOGGED")) class IgnoreIPDNS(IgnoreIP): @@ -382,18 +382,17 @@ class LogFileMonitor(LogCaptureTestCase): def testNoLogFile(self): _killfile(self.file, self.name) self.filter.getFailures(self.name) - failure_was_logged = self._is_logged('Unable to open %s' % self.name) - self.assertTrue(failure_was_logged) + self.assertLogged('Unable to open %s' % self.name) def testRemovingFailRegex(self): self.filter.delFailRegex(0) - self.assertFalse(self._is_logged('Cannot remove regular expression. Index 0 is not valid')) + self.assertNotLogged('Cannot remove regular expression. Index 0 is not valid') self.filter.delFailRegex(0) - self.assertTrue(self._is_logged('Cannot remove regular expression. Index 0 is not valid')) + self.assertLogged('Cannot remove regular expression. Index 0 is not valid') def testRemovingIgnoreRegex(self): self.filter.delIgnoreRegex(0) - self.assertTrue(self._is_logged('Cannot remove regular expression. Index 0 is not valid')) + self.assertLogged('Cannot remove regular expression. Index 0 is not valid') def testNewChangeViaIsModified(self): # it is a brand new one -- so first we think it is modified diff --git a/fail2ban/tests/servertestcase.py b/fail2ban/tests/servertestcase.py index 57453269..6f946cc0 100644 --- a/fail2ban/tests/servertestcase.py +++ b/fail2ban/tests/servertestcase.py @@ -934,7 +934,7 @@ class LoggingTests(LogCaptureTestCase): badThread = _BadThread() badThread.start() badThread.join() - self.assertTrue(self._is_logged("Unhandled exception")) + self.assertLogged("Unhandled exception") finally: sys.__excepthook__ = prev_exchook self.assertEqual(len(x), 1) diff --git a/fail2ban/tests/utils.py b/fail2ban/tests/utils.py index e7993ead..9d50646a 100644 --- a/fail2ban/tests/utils.py +++ b/fail2ban/tests/utils.py @@ -232,6 +232,42 @@ class LogCaptureTestCase(unittest.TestCase): def _is_logged(self, s): return s in self._log.getvalue() + def assertLogged(self, s): + """Assert that a string was logged + + Preferable to assertTrue(self._is_logged(..))) + since provides message with the actual log. + + Parameters + ---------- + s : string or list/set/tuple of strings + Test should succeed if string (or any of the listed) is present in the log + """ + logged = self._log.getvalue() + + s_iter = s if isinstance(s, (tuple, list, set)) else [s] + for s_ in s_iter: + if s_ in logged: + return + raise AssertionError("%r was not found in the log: %r" % (s, logged)) + + def assertNotLogged(self, s): + """Assert that a string was not logged + + Parameters + ---------- + s : string or list/set/tuple of strings + Test should succeed if the string (or at least one of the listed) is not present in the log + """ + logged = self._log.getvalue() + + s_iter = s if isinstance(s, (tuple, list, set)) else [s] + for s_ in s_iter: + if s_ not in logged: + return + raise AssertionError("%r was found present in the log: %r" % (s, logged)) + + def getLog(self): return self._log.getvalue()