diff --git a/fail2ban/tests/servertestcase.py b/fail2ban/tests/servertestcase.py index d9db6c47..99afa012 100644 --- a/fail2ban/tests/servertestcase.py +++ b/fail2ban/tests/servertestcase.py @@ -33,7 +33,9 @@ import sys import platform from ..server.failregex import Regex, FailRegex, RegexException +from ..server import actions as _actions from ..server.server import Server +from ..server.ipdns import IPAddr from ..server.jail import Jail from ..server.jailthread import JailThread from ..server.utils import Utils @@ -49,6 +51,8 @@ except ImportError: # pragma: no cover TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files") FAST_BACKEND = "polling" +logSys = getLogger("fail2ban") + class TestServer(Server): def setLogLevel(self, *args, **kwargs): @@ -963,3 +967,227 @@ class LoggingTests(LogCaptureTestCase): sys.__excepthook__ = prev_exchook self.assertEqual(len(x), 1) self.assertEqual(x[0][0], RuntimeError) + + +from clientreadertestcase import ActionReader, JailReader, JailsReader, CONFIG_DIR, STOCK + +class ServerConfigReaderTests(LogCaptureTestCase): + + def __init__(self, *args, **kwargs): + super(ServerConfigReaderTests, self).__init__(*args, **kwargs) + self.__share_cfg = {} + + def setUp(self): + """Call before every test case.""" + super(ServerConfigReaderTests, self).setUp() + self._execCmdLst = [] + + def tearDown(self): + """Call after every test case.""" + super(ServerConfigReaderTests, self).tearDown() + + def _executeCmd(self, realCmd, timeout=60): + for l in realCmd.split('\n'): + if not l.startswith('#'): + logSys.debug('exec-cmd: `%s`', l) + else: + logSys.debug(l) + return True + + def test_IPAddr(self): + self.assertTrue(IPAddr('192.0.2.1').isIPv4) + self.assertTrue(IPAddr('2001:DB8::').isIPv6) + + if STOCK: + + def testCheckStockJailActions(self): + return + jails = JailsReader(basedir=CONFIG_DIR, force_enable=True, share_config=self.__share_cfg) # we are running tests from root project dir atm + self.assertTrue(jails.read()) # opens fine + self.assertTrue(jails.getOptions()) # reads fine + stream = jails.convert(allow_no_files=True) + + server = TestServer() + transm = server._Server__transm + cmdHandler = transm._Transmitter__commandHandler + + # for cmd in stream: + # print(cmd) + + # filter all start commands (we want not start all jails): + for cmd in stream: + if cmd[0] != 'start': + # change to the fast init backend: + if cmd[0] == 'add': + cmd[2] = 'polling' + # add dummy regex to prevent too long compile of all regexp (we don't use it in this test at all): + # [todo sebres] remove `not hasattr(unittest, 'F2B') or `, after merge with "f2b-perfom-prepare-716" ... + elif (not hasattr(unittest, 'F2B') or unittest.F2B.fast) and len(cmd) > 3 and cmd[0] == 'set' and cmd[2] == 'addfailregex': + cmd[3] = "DUMMY-REGEX " + # command to server, use cmdHandler direct instead of `transm.proceed(cmd)`: + try: + cmdHandler(cmd) + except Exception, e: # pragma: no cover + self.fail("Command %r has failed. Received %r" % (cmd, e)) + + # jails = server._Server__jails + # for j in jails: + # print(j, jails[j]) + + def getDefaultJailStream(self, jail, act): + act = act.replace('%(__name__)s', jail) + actName, actOpt = JailReader.extractOptions(act) + stream = [ + ['add', jail, 'polling'], + # ['set', jail, 'addfailregex', 'DUMMY-REGEX '], + ] + action = ActionReader( + actName, jail, actOpt, + share_config=self.__share_cfg, basedir=CONFIG_DIR) + self.assertTrue(action.read()) + action.getOptions({}) + stream.extend(action.convert()) + return stream + + def _assertLoggedAllTests(self, tests): + for t in tests: + self.assertLogged(t) + + + def testCheckStockCommandActions(self): + server = TestServer() + transm = server._Server__transm + cmdHandler = transm._Transmitter__commandHandler + + testJailsActions = ( + ('j-w-iptables-mp', 'iptables-multiport[name=%(__name__)s, bantime="600", port="http,https", protocol="tcp", chain="INPUT"]', { + 'ip4': '`iptables ', 'ip6': '`ip6tables ', + 'start': ( + "`iptables -w -N f2b-j-w-iptables-mp`", + "`iptables -w -A f2b-j-w-iptables-mp -j RETURN`", + "`iptables -w -I INPUT -p tcp -m multiport --dports http,https -j f2b-j-w-iptables-mp`", + "`ip6tables -w -N f2b-j-w-iptables-mp`", + "`ip6tables -w -A f2b-j-w-iptables-mp -j RETURN`", + "`ip6tables -w -I INPUT -p tcp -m multiport --dports http,https -j f2b-j-w-iptables-mp`", + ), + 'stop': ( + "`iptables -w -D INPUT -p tcp -m multiport --dports http,https -j f2b-j-w-iptables-mp`", + "`iptables -w -F f2b-j-w-iptables-mp`", + "`iptables -w -X f2b-j-w-iptables-mp`", + "`ip6tables -w -D INPUT -p tcp -m multiport --dports http,https -j f2b-j-w-iptables-mp`", + "`ip6tables -w -F f2b-j-w-iptables-mp`", + "`ip6tables -w -X f2b-j-w-iptables-mp`", + ), + 'ip4-check': ( + r"""`iptables -w -n -L INPUT | grep -q 'f2b-j-w-iptables-mp[ \t]'`""", + ), + 'ip6-check': ( + r"""`ip6tables -w -n -L INPUT | grep -q 'f2b-j-w-iptables-mp[ \t]'`""", + ), + 'ip4-ban': ( + r"`iptables -w -I f2b-j-w-iptables-mp 1 -s 192.0.2.1 -j REJECT --reject-with icmp-port-unreachable`", + ), + 'ip4-unban': ( + r"`iptables -w -D f2b-j-w-iptables-mp -s 192.0.2.1 -j REJECT --reject-with icmp-port-unreachable`", + ), + 'ip6-ban': ( + r"`ip6tables -w -I f2b-j-w-iptables-mp 1 -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`", + ), + 'ip6-unban': ( + r"`ip6tables -w -D f2b-j-w-iptables-mp -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`", + ), + }), + ('j-w-iptables-ap', 'iptables-allports[name=%(__name__)s, bantime="600", protocol="tcp", chain="INPUT"]', { + 'ip4': '`iptables ', 'ip6': '`ip6tables ', + 'start': ( + "`iptables -w -N f2b-j-w-iptables-ap`", + "`iptables -w -A f2b-j-w-iptables-ap -j RETURN`", + "`iptables -w -I INPUT -p tcp -j f2b-j-w-iptables-ap`", + "`ip6tables -w -N f2b-j-w-iptables-ap`", + "`ip6tables -w -A f2b-j-w-iptables-ap -j RETURN`", + "`ip6tables -w -I INPUT -p tcp -j f2b-j-w-iptables-ap`", + ), + 'stop': ( + "`iptables -w -D INPUT -p tcp -j f2b-j-w-iptables-ap`", + "`iptables -w -F f2b-j-w-iptables-ap`", + "`iptables -w -X f2b-j-w-iptables-ap`", + "`ip6tables -w -D INPUT -p tcp -j f2b-j-w-iptables-ap`", + "`ip6tables -w -F f2b-j-w-iptables-ap`", + "`ip6tables -w -X f2b-j-w-iptables-ap`", + ), + 'ip4-check': ( + r"""`iptables -w -n -L INPUT | grep -q 'f2b-j-w-iptables-ap[ \t]'`""", + ), + 'ip6-check': ( + r"""`ip6tables -w -n -L INPUT | grep -q 'f2b-j-w-iptables-ap[ \t]'`""", + ), + 'ip4-ban': ( + r"`iptables -w -I f2b-j-w-iptables-ap 1 -s 192.0.2.1 -j REJECT --reject-with icmp-port-unreachable`", + ), + 'ip4-unban': ( + r"`iptables -w -D f2b-j-w-iptables-ap -s 192.0.2.1 -j REJECT --reject-with icmp-port-unreachable`", + ), + 'ip6-ban': ( + r"`ip6tables -w -I f2b-j-w-iptables-ap 1 -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`", + ), + 'ip6-unban': ( + r"`ip6tables -w -D f2b-j-w-iptables-ap -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`", + ), + }), + ) + + for jail, act, tests in testJailsActions: + stream = self.getDefaultJailStream(jail, act) + + # for cmd in stream: + # print(cmd) + + # filter all start commands (we want not start all jails): + for cmd in stream: + # command to server, use cmdHandler direct instead of `transm.proceed(cmd)`: + try: + cmdHandler(cmd) + except Exception, e: # pragma: no cover + self.fail("Command %r has failed. Received %r" % (cmd, e)) + + jails = server._Server__jails + + for jail, act, tests in testJailsActions: + # print(jail, jails[jail]) + for a in jails[jail].actions: + action = jails[jail].actions[a] + logSys.debug('# ' + ('=' * 50)) + logSys.debug('# == %-44s ==', jail + ' - ' + action._name) + logSys.debug('# ' + ('=' * 50)) + self.assertTrue(isinstance(action, _actions.CommandAction)) + # wrap default command processor: + action.executeCmd = self._executeCmd + # test start : + logSys.debug('# === start ==='); self.pruneLog() + action.start() + self._assertLoggedAllTests(tests['start']) + # test ban ip4 : + logSys.debug('# === ban-ipv4 ==='); self.pruneLog() + action.ban({'ip': IPAddr('192.0.2.1')}) + self._assertLoggedAllTests(tests['ip4-check']+tests['ip4-ban']) + self.assertNotLogged(tests['ip6']) + # test unban ip4 : + logSys.debug('# === unban ipv4 ==='); self.pruneLog() + action.unban({'ip': IPAddr('192.0.2.1')}) + self._assertLoggedAllTests(tests['ip4-check']+tests['ip4-unban']) + self.assertNotLogged(tests['ip6']) + # test ban ip6 : + logSys.debug('# === ban ipv6 ==='); self.pruneLog() + action.ban({'ip': IPAddr('2001:DB8::')}) + self._assertLoggedAllTests(tests['ip6-check']+tests['ip6-ban']) + self.assertNotLogged(tests['ip4']) + # test unban ip6 : + logSys.debug('# === unban ipv6 ==='); self.pruneLog() + action.unban({'ip': IPAddr('2001:DB8::')}) + self._assertLoggedAllTests(tests['ip6-check']+tests['ip6-unban']) + self.assertNotLogged(tests['ip4']) + # test stop : + logSys.debug('# === stop ==='); self.pruneLog() + action.stop() + self._assertLoggedAllTests(tests['stop']) + diff --git a/fail2ban/tests/utils.py b/fail2ban/tests/utils.py index 4c429105..fe3ab783 100644 --- a/fail2ban/tests/utils.py +++ b/fail2ban/tests/utils.py @@ -174,6 +174,7 @@ def gatherTests(regexps=None, opts=None): tests.addTest(unittest.makeSuite(servertestcase.JailTests)) tests.addTest(unittest.makeSuite(servertestcase.RegexTests)) tests.addTest(unittest.makeSuite(servertestcase.LoggingTests)) + tests.addTest(unittest.makeSuite(servertestcase.ServerConfigReaderTests)) tests.addTest(unittest.makeSuite(actiontestcase.CommandActionTest)) tests.addTest(unittest.makeSuite(actionstestcase.ExecuteActions)) # Ticket, BanTicket, FailTicket @@ -356,6 +357,8 @@ class LogCaptureTestCase(unittest.TestCase): return raise AssertionError("All of the %r were found present in the log: %r" % (s, logged)) + def pruneLog(self): + self._log.truncate(0) def getLog(self): return self._log.getvalue()