From 43c0f3cdc43a31cfdf42b12d148085f3bce0dea0 Mon Sep 17 00:00:00 2001 From: sebres Date: Tue, 3 May 2016 10:38:33 +0200 Subject: [PATCH] test cases extended --- fail2ban/tests/servertestcase.py | 60 +++++++++++++++++++++++--------- fail2ban/tests/utils.py | 23 ++++++++---- 2 files changed, 60 insertions(+), 23 deletions(-) diff --git a/fail2ban/tests/servertestcase.py b/fail2ban/tests/servertestcase.py index 99afa012..bac64dd1 100644 --- a/fail2ban/tests/servertestcase.py +++ b/fail2ban/tests/servertestcase.py @@ -1001,7 +1001,6 @@ class ServerConfigReaderTests(LogCaptureTestCase): if STOCK: def testCheckStockJailActions(self): - return jails = JailsReader(basedir=CONFIG_DIR, force_enable=True, share_config=self.__share_cfg) # we are running tests from root project dir atm self.assertTrue(jails.read()) # opens fine self.assertTrue(jails.getOptions()) # reads fine @@ -1020,6 +1019,9 @@ class ServerConfigReaderTests(LogCaptureTestCase): # change to the fast init backend: if cmd[0] == 'add': cmd[2] = 'polling' + # change log path to test log of jail (to prevent "Permission denied" on /var/logs/ for test-user): + elif len(cmd) > 3 and cmd[0] == 'set' and cmd[2] == 'addlogpath': + cmd[3] = os.path.join(TEST_FILES_DIR, 'logs', cmd[1]) # add dummy regex to prevent too long compile of all regexp (we don't use it in this test at all): # [todo sebres] remove `not hasattr(unittest, 'F2B') or `, after merge with "f2b-perfom-prepare-716" ... elif (not hasattr(unittest, 'F2B') or unittest.F2B.fast) and len(cmd) > 3 and cmd[0] == 'set' and cmd[2] == 'addfailregex': @@ -1049,11 +1051,6 @@ class ServerConfigReaderTests(LogCaptureTestCase): stream.extend(action.convert()) return stream - def _assertLoggedAllTests(self, tests): - for t in tests: - self.assertLogged(t) - - def testCheckStockCommandActions(self): server = TestServer() transm = server._Server__transm @@ -1134,6 +1131,37 @@ class ServerConfigReaderTests(LogCaptureTestCase): r"`ip6tables -w -D f2b-j-w-iptables-ap -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`", ), }), + ('j-w-iptables-ipset', 'iptables-ipset-proto6[name=%(__name__)s, bantime="600", port="http", protocol="tcp", chain="INPUT"]', { + 'ip4': ' f2b-j-w-iptables-ipset ', 'ip6': ' f2b-j-w-iptables-ipset6 ', + 'start': ( + "`ipset create f2b-j-w-iptables-ipset hash:ip timeout 600`", + "`iptables -w -I INPUT -p tcp -m multiport --dports http -m set --match-set f2b-j-w-iptables-ipset src -j REJECT --reject-with icmp-port-unreachable`", + "`ipset create f2b-j-w-iptables-ipset6 hash:ip timeout 600 family inet6`", + "`ip6tables -w -I INPUT -p tcp -m multiport --dports http -m set --match-set f2b-j-w-iptables-ipset6 src -j REJECT --reject-with icmp6-port-unreachable`", + ), + 'stop': ( + "`iptables -w -D INPUT -p tcp -m multiport --dports http -m set --match-set f2b-j-w-iptables-ipset src -j REJECT --reject-with icmp-port-unreachable`", + "`ipset flush f2b-j-w-iptables-ipset`", + "`ipset destroy f2b-j-w-iptables-ipset`", + "`ip6tables -w -D INPUT -p tcp -m multiport --dports http -m set --match-set f2b-j-w-iptables-ipset6 src -j REJECT --reject-with icmp6-port-unreachable`", + "`ipset flush f2b-j-w-iptables-ipset6`", + "`ipset destroy f2b-j-w-iptables-ipset6`", + ), + 'ip4-check': (), + 'ip6-check': (), + 'ip4-ban': ( + r"`ipset add f2b-j-w-iptables-ipset 192.0.2.1 timeout 600 -exist`", + ), + 'ip4-unban': ( + r"`ipset del f2b-j-w-iptables-ipset 192.0.2.1 -exist`", + ), + 'ip6-ban': ( + r"`ipset add f2b-j-w-iptables-ipset6 2001:db8:: timeout 600 -exist`", + ), + 'ip6-unban': ( + r"`ipset del f2b-j-w-iptables-ipset6 2001:db8:: -exist`", + ), + }), ) for jail, act, tests in testJailsActions: @@ -1144,11 +1172,9 @@ class ServerConfigReaderTests(LogCaptureTestCase): # filter all start commands (we want not start all jails): for cmd in stream: - # command to server, use cmdHandler direct instead of `transm.proceed(cmd)`: - try: - cmdHandler(cmd) - except Exception, e: # pragma: no cover - self.fail("Command %r has failed. Received %r" % (cmd, e)) + # command to server: + ret, res = transm.proceed(cmd) + self.assertEqual(ret, 0) jails = server._Server__jails @@ -1165,29 +1191,29 @@ class ServerConfigReaderTests(LogCaptureTestCase): # test start : logSys.debug('# === start ==='); self.pruneLog() action.start() - self._assertLoggedAllTests(tests['start']) + self.assertLogged(*tests['start'], all=True) # test ban ip4 : logSys.debug('# === ban-ipv4 ==='); self.pruneLog() action.ban({'ip': IPAddr('192.0.2.1')}) - self._assertLoggedAllTests(tests['ip4-check']+tests['ip4-ban']) + self.assertLogged(*tests['ip4-check']+tests['ip4-ban'], all=True) self.assertNotLogged(tests['ip6']) # test unban ip4 : logSys.debug('# === unban ipv4 ==='); self.pruneLog() action.unban({'ip': IPAddr('192.0.2.1')}) - self._assertLoggedAllTests(tests['ip4-check']+tests['ip4-unban']) + self.assertLogged(*tests['ip4-check']+tests['ip4-unban'], all=True) self.assertNotLogged(tests['ip6']) # test ban ip6 : logSys.debug('# === ban ipv6 ==='); self.pruneLog() action.ban({'ip': IPAddr('2001:DB8::')}) - self._assertLoggedAllTests(tests['ip6-check']+tests['ip6-ban']) + self.assertLogged(*tests['ip6-check']+tests['ip6-ban'], all=True) self.assertNotLogged(tests['ip4']) # test unban ip6 : logSys.debug('# === unban ipv6 ==='); self.pruneLog() action.unban({'ip': IPAddr('2001:DB8::')}) - self._assertLoggedAllTests(tests['ip6-check']+tests['ip6-unban']) + self.assertLogged(*tests['ip6-check']+tests['ip6-unban'], all=True) self.assertNotLogged(tests['ip4']) # test stop : logSys.debug('# === stop ==='); self.pruneLog() action.stop() - self._assertLoggedAllTests(tests['stop']) + self.assertLogged(*tests['stop'], all=True) diff --git a/fail2ban/tests/utils.py b/fail2ban/tests/utils.py index fe3ab783..52097fa5 100644 --- a/fail2ban/tests/utils.py +++ b/fail2ban/tests/utils.py @@ -325,7 +325,7 @@ class LogCaptureTestCase(unittest.TestCase): def _is_logged(self, s): return s in self._log.getvalue() - def assertLogged(self, *s): + def assertLogged(self, *s, **kwargs): """Assert that one of the strings was logged Preferable to assertTrue(self._is_logged(..))) @@ -335,12 +335,22 @@ class LogCaptureTestCase(unittest.TestCase): ---------- s : string or list/set/tuple of strings Test should succeed if string (or any of the listed) is present in the log + all : boolean, should find all in s """ logged = self._log.getvalue() - for s_ in s: - if s_ in logged: - return - raise AssertionError("None among %r was found in the log: %r" % (s, logged)) + if not kwargs.get('all', False): + # at least one entry should be found: + for s_ in s: + if s_ in logged: + return + # pragma: no cover + self.fail("None among %r was found in the log: ===\n%s===" % (s, logged)) + else: + # each entry should be found: + for s_ in s: + if s_ not in logged: + # pragma: no cover + self.fail("%r was not found in the log: ===\n%s===" % (s_, logged)) def assertNotLogged(self, *s): """Assert that strings were not logged @@ -355,7 +365,8 @@ class LogCaptureTestCase(unittest.TestCase): for s_ in s: if s_ not in logged: return - raise AssertionError("All of the %r were found present in the log: %r" % (s, logged)) + # pragma: no cover + self.fail("All of the %r were found present in the log: ===\n%s===" % (s, logged)) def pruneLog(self): self._log.truncate(0)