test cases preliminary extended;

pull/1414/head
sebres 9 years ago
parent 25d6cf8dd2
commit 2497b05abc

@ -33,7 +33,9 @@ import sys
import platform
from ..server.failregex import Regex, FailRegex, RegexException
from ..server import actions as _actions
from ..server.server import Server
from ..server.ipdns import IPAddr
from ..server.jail import Jail
from ..server.jailthread import JailThread
from ..server.utils import Utils
@ -49,6 +51,8 @@ except ImportError: # pragma: no cover
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
FAST_BACKEND = "polling"
logSys = getLogger("fail2ban")
class TestServer(Server):
def setLogLevel(self, *args, **kwargs):
@ -963,3 +967,227 @@ class LoggingTests(LogCaptureTestCase):
sys.__excepthook__ = prev_exchook
self.assertEqual(len(x), 1)
self.assertEqual(x[0][0], RuntimeError)
from clientreadertestcase import ActionReader, JailReader, JailsReader, CONFIG_DIR, STOCK
class ServerConfigReaderTests(LogCaptureTestCase):
def __init__(self, *args, **kwargs):
super(ServerConfigReaderTests, self).__init__(*args, **kwargs)
self.__share_cfg = {}
def setUp(self):
"""Call before every test case."""
super(ServerConfigReaderTests, self).setUp()
self._execCmdLst = []
def tearDown(self):
"""Call after every test case."""
super(ServerConfigReaderTests, self).tearDown()
def _executeCmd(self, realCmd, timeout=60):
for l in realCmd.split('\n'):
if not l.startswith('#'):
logSys.debug('exec-cmd: `%s`', l)
else:
logSys.debug(l)
return True
def test_IPAddr(self):
self.assertTrue(IPAddr('192.0.2.1').isIPv4)
self.assertTrue(IPAddr('2001:DB8::').isIPv6)
if STOCK:
def testCheckStockJailActions(self):
return
jails = JailsReader(basedir=CONFIG_DIR, force_enable=True, share_config=self.__share_cfg) # we are running tests from root project dir atm
self.assertTrue(jails.read()) # opens fine
self.assertTrue(jails.getOptions()) # reads fine
stream = jails.convert(allow_no_files=True)
server = TestServer()
transm = server._Server__transm
cmdHandler = transm._Transmitter__commandHandler
# for cmd in stream:
# print(cmd)
# filter all start commands (we want not start all jails):
for cmd in stream:
if cmd[0] != 'start':
# change to the fast init backend:
if cmd[0] == 'add':
cmd[2] = 'polling'
# add dummy regex to prevent too long compile of all regexp (we don't use it in this test at all):
# [todo sebres] remove `not hasattr(unittest, 'F2B') or `, after merge with "f2b-perfom-prepare-716" ...
elif (not hasattr(unittest, 'F2B') or unittest.F2B.fast) and len(cmd) > 3 and cmd[0] == 'set' and cmd[2] == 'addfailregex':
cmd[3] = "DUMMY-REGEX <HOST>"
# command to server, use cmdHandler direct instead of `transm.proceed(cmd)`:
try:
cmdHandler(cmd)
except Exception, e: # pragma: no cover
self.fail("Command %r has failed. Received %r" % (cmd, e))
# jails = server._Server__jails
# for j in jails:
# print(j, jails[j])
def getDefaultJailStream(self, jail, act):
act = act.replace('%(__name__)s', jail)
actName, actOpt = JailReader.extractOptions(act)
stream = [
['add', jail, 'polling'],
# ['set', jail, 'addfailregex', 'DUMMY-REGEX <HOST>'],
]
action = ActionReader(
actName, jail, actOpt,
share_config=self.__share_cfg, basedir=CONFIG_DIR)
self.assertTrue(action.read())
action.getOptions({})
stream.extend(action.convert())
return stream
def _assertLoggedAllTests(self, tests):
for t in tests:
self.assertLogged(t)
def testCheckStockCommandActions(self):
server = TestServer()
transm = server._Server__transm
cmdHandler = transm._Transmitter__commandHandler
testJailsActions = (
('j-w-iptables-mp', 'iptables-multiport[name=%(__name__)s, bantime="600", port="http,https", protocol="tcp", chain="INPUT"]', {
'ip4': '`iptables ', 'ip6': '`ip6tables ',
'start': (
"`iptables -w -N f2b-j-w-iptables-mp`",
"`iptables -w -A f2b-j-w-iptables-mp -j RETURN`",
"`iptables -w -I INPUT -p tcp -m multiport --dports http,https -j f2b-j-w-iptables-mp`",
"`ip6tables -w -N f2b-j-w-iptables-mp`",
"`ip6tables -w -A f2b-j-w-iptables-mp -j RETURN`",
"`ip6tables -w -I INPUT -p tcp -m multiport --dports http,https -j f2b-j-w-iptables-mp`",
),
'stop': (
"`iptables -w -D INPUT -p tcp -m multiport --dports http,https -j f2b-j-w-iptables-mp`",
"`iptables -w -F f2b-j-w-iptables-mp`",
"`iptables -w -X f2b-j-w-iptables-mp`",
"`ip6tables -w -D INPUT -p tcp -m multiport --dports http,https -j f2b-j-w-iptables-mp`",
"`ip6tables -w -F f2b-j-w-iptables-mp`",
"`ip6tables -w -X f2b-j-w-iptables-mp`",
),
'ip4-check': (
r"""`iptables -w -n -L INPUT | grep -q 'f2b-j-w-iptables-mp[ \t]'`""",
),
'ip6-check': (
r"""`ip6tables -w -n -L INPUT | grep -q 'f2b-j-w-iptables-mp[ \t]'`""",
),
'ip4-ban': (
r"`iptables -w -I f2b-j-w-iptables-mp 1 -s 192.0.2.1 -j REJECT --reject-with icmp-port-unreachable`",
),
'ip4-unban': (
r"`iptables -w -D f2b-j-w-iptables-mp -s 192.0.2.1 -j REJECT --reject-with icmp-port-unreachable`",
),
'ip6-ban': (
r"`ip6tables -w -I f2b-j-w-iptables-mp 1 -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`",
),
'ip6-unban': (
r"`ip6tables -w -D f2b-j-w-iptables-mp -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`",
),
}),
('j-w-iptables-ap', 'iptables-allports[name=%(__name__)s, bantime="600", protocol="tcp", chain="INPUT"]', {
'ip4': '`iptables ', 'ip6': '`ip6tables ',
'start': (
"`iptables -w -N f2b-j-w-iptables-ap`",
"`iptables -w -A f2b-j-w-iptables-ap -j RETURN`",
"`iptables -w -I INPUT -p tcp -j f2b-j-w-iptables-ap`",
"`ip6tables -w -N f2b-j-w-iptables-ap`",
"`ip6tables -w -A f2b-j-w-iptables-ap -j RETURN`",
"`ip6tables -w -I INPUT -p tcp -j f2b-j-w-iptables-ap`",
),
'stop': (
"`iptables -w -D INPUT -p tcp -j f2b-j-w-iptables-ap`",
"`iptables -w -F f2b-j-w-iptables-ap`",
"`iptables -w -X f2b-j-w-iptables-ap`",
"`ip6tables -w -D INPUT -p tcp -j f2b-j-w-iptables-ap`",
"`ip6tables -w -F f2b-j-w-iptables-ap`",
"`ip6tables -w -X f2b-j-w-iptables-ap`",
),
'ip4-check': (
r"""`iptables -w -n -L INPUT | grep -q 'f2b-j-w-iptables-ap[ \t]'`""",
),
'ip6-check': (
r"""`ip6tables -w -n -L INPUT | grep -q 'f2b-j-w-iptables-ap[ \t]'`""",
),
'ip4-ban': (
r"`iptables -w -I f2b-j-w-iptables-ap 1 -s 192.0.2.1 -j REJECT --reject-with icmp-port-unreachable`",
),
'ip4-unban': (
r"`iptables -w -D f2b-j-w-iptables-ap -s 192.0.2.1 -j REJECT --reject-with icmp-port-unreachable`",
),
'ip6-ban': (
r"`ip6tables -w -I f2b-j-w-iptables-ap 1 -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`",
),
'ip6-unban': (
r"`ip6tables -w -D f2b-j-w-iptables-ap -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`",
),
}),
)
for jail, act, tests in testJailsActions:
stream = self.getDefaultJailStream(jail, act)
# for cmd in stream:
# print(cmd)
# filter all start commands (we want not start all jails):
for cmd in stream:
# command to server, use cmdHandler direct instead of `transm.proceed(cmd)`:
try:
cmdHandler(cmd)
except Exception, e: # pragma: no cover
self.fail("Command %r has failed. Received %r" % (cmd, e))
jails = server._Server__jails
for jail, act, tests in testJailsActions:
# print(jail, jails[jail])
for a in jails[jail].actions:
action = jails[jail].actions[a]
logSys.debug('# ' + ('=' * 50))
logSys.debug('# == %-44s ==', jail + ' - ' + action._name)
logSys.debug('# ' + ('=' * 50))
self.assertTrue(isinstance(action, _actions.CommandAction))
# wrap default command processor:
action.executeCmd = self._executeCmd
# test start :
logSys.debug('# === start ==='); self.pruneLog()
action.start()
self._assertLoggedAllTests(tests['start'])
# test ban ip4 :
logSys.debug('# === ban-ipv4 ==='); self.pruneLog()
action.ban({'ip': IPAddr('192.0.2.1')})
self._assertLoggedAllTests(tests['ip4-check']+tests['ip4-ban'])
self.assertNotLogged(tests['ip6'])
# test unban ip4 :
logSys.debug('# === unban ipv4 ==='); self.pruneLog()
action.unban({'ip': IPAddr('192.0.2.1')})
self._assertLoggedAllTests(tests['ip4-check']+tests['ip4-unban'])
self.assertNotLogged(tests['ip6'])
# test ban ip6 :
logSys.debug('# === ban ipv6 ==='); self.pruneLog()
action.ban({'ip': IPAddr('2001:DB8::')})
self._assertLoggedAllTests(tests['ip6-check']+tests['ip6-ban'])
self.assertNotLogged(tests['ip4'])
# test unban ip6 :
logSys.debug('# === unban ipv6 ==='); self.pruneLog()
action.unban({'ip': IPAddr('2001:DB8::')})
self._assertLoggedAllTests(tests['ip6-check']+tests['ip6-unban'])
self.assertNotLogged(tests['ip4'])
# test stop :
logSys.debug('# === stop ==='); self.pruneLog()
action.stop()
self._assertLoggedAllTests(tests['stop'])

@ -174,6 +174,7 @@ def gatherTests(regexps=None, opts=None):
tests.addTest(unittest.makeSuite(servertestcase.JailTests))
tests.addTest(unittest.makeSuite(servertestcase.RegexTests))
tests.addTest(unittest.makeSuite(servertestcase.LoggingTests))
tests.addTest(unittest.makeSuite(servertestcase.ServerConfigReaderTests))
tests.addTest(unittest.makeSuite(actiontestcase.CommandActionTest))
tests.addTest(unittest.makeSuite(actionstestcase.ExecuteActions))
# Ticket, BanTicket, FailTicket
@ -356,6 +357,8 @@ class LogCaptureTestCase(unittest.TestCase):
return
raise AssertionError("All of the %r were found present in the log: %r" % (s, logged))
def pruneLog(self):
self._log.truncate(0)
def getLog(self):
return self._log.getvalue()

Loading…
Cancel
Save