test cases extended

pull/1414/head
sebres 2016-05-03 10:38:33 +02:00
parent 2497b05abc
commit 43c0f3cdc4
2 changed files with 60 additions and 23 deletions

View File

@ -1001,7 +1001,6 @@ class ServerConfigReaderTests(LogCaptureTestCase):
if STOCK:
def testCheckStockJailActions(self):
return
jails = JailsReader(basedir=CONFIG_DIR, force_enable=True, share_config=self.__share_cfg) # we are running tests from root project dir atm
self.assertTrue(jails.read()) # opens fine
self.assertTrue(jails.getOptions()) # reads fine
@ -1020,6 +1019,9 @@ class ServerConfigReaderTests(LogCaptureTestCase):
# change to the fast init backend:
if cmd[0] == 'add':
cmd[2] = 'polling'
# change log path to test log of jail (to prevent "Permission denied" on /var/logs/ for test-user):
elif len(cmd) > 3 and cmd[0] == 'set' and cmd[2] == 'addlogpath':
cmd[3] = os.path.join(TEST_FILES_DIR, 'logs', cmd[1])
# add dummy regex to prevent too long compile of all regexp (we don't use it in this test at all):
# [todo sebres] remove `not hasattr(unittest, 'F2B') or `, after merge with "f2b-perfom-prepare-716" ...
elif (not hasattr(unittest, 'F2B') or unittest.F2B.fast) and len(cmd) > 3 and cmd[0] == 'set' and cmd[2] == 'addfailregex':
@ -1049,11 +1051,6 @@ class ServerConfigReaderTests(LogCaptureTestCase):
stream.extend(action.convert())
return stream
def _assertLoggedAllTests(self, tests):
for t in tests:
self.assertLogged(t)
def testCheckStockCommandActions(self):
server = TestServer()
transm = server._Server__transm
@ -1134,6 +1131,37 @@ class ServerConfigReaderTests(LogCaptureTestCase):
r"`ip6tables -w -D f2b-j-w-iptables-ap -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`",
),
}),
('j-w-iptables-ipset', 'iptables-ipset-proto6[name=%(__name__)s, bantime="600", port="http", protocol="tcp", chain="INPUT"]', {
'ip4': ' f2b-j-w-iptables-ipset ', 'ip6': ' f2b-j-w-iptables-ipset6 ',
'start': (
"`ipset create f2b-j-w-iptables-ipset hash:ip timeout 600`",
"`iptables -w -I INPUT -p tcp -m multiport --dports http -m set --match-set f2b-j-w-iptables-ipset src -j REJECT --reject-with icmp-port-unreachable`",
"`ipset create f2b-j-w-iptables-ipset6 hash:ip timeout 600 family inet6`",
"`ip6tables -w -I INPUT -p tcp -m multiport --dports http -m set --match-set f2b-j-w-iptables-ipset6 src -j REJECT --reject-with icmp6-port-unreachable`",
),
'stop': (
"`iptables -w -D INPUT -p tcp -m multiport --dports http -m set --match-set f2b-j-w-iptables-ipset src -j REJECT --reject-with icmp-port-unreachable`",
"`ipset flush f2b-j-w-iptables-ipset`",
"`ipset destroy f2b-j-w-iptables-ipset`",
"`ip6tables -w -D INPUT -p tcp -m multiport --dports http -m set --match-set f2b-j-w-iptables-ipset6 src -j REJECT --reject-with icmp6-port-unreachable`",
"`ipset flush f2b-j-w-iptables-ipset6`",
"`ipset destroy f2b-j-w-iptables-ipset6`",
),
'ip4-check': (),
'ip6-check': (),
'ip4-ban': (
r"`ipset add f2b-j-w-iptables-ipset 192.0.2.1 timeout 600 -exist`",
),
'ip4-unban': (
r"`ipset del f2b-j-w-iptables-ipset 192.0.2.1 -exist`",
),
'ip6-ban': (
r"`ipset add f2b-j-w-iptables-ipset6 2001:db8:: timeout 600 -exist`",
),
'ip6-unban': (
r"`ipset del f2b-j-w-iptables-ipset6 2001:db8:: -exist`",
),
}),
)
for jail, act, tests in testJailsActions:
@ -1144,11 +1172,9 @@ class ServerConfigReaderTests(LogCaptureTestCase):
# filter all start commands (we want not start all jails):
for cmd in stream:
# command to server, use cmdHandler direct instead of `transm.proceed(cmd)`:
try:
cmdHandler(cmd)
except Exception, e: # pragma: no cover
self.fail("Command %r has failed. Received %r" % (cmd, e))
# command to server:
ret, res = transm.proceed(cmd)
self.assertEqual(ret, 0)
jails = server._Server__jails
@ -1165,29 +1191,29 @@ class ServerConfigReaderTests(LogCaptureTestCase):
# test start :
logSys.debug('# === start ==='); self.pruneLog()
action.start()
self._assertLoggedAllTests(tests['start'])
self.assertLogged(*tests['start'], all=True)
# test ban ip4 :
logSys.debug('# === ban-ipv4 ==='); self.pruneLog()
action.ban({'ip': IPAddr('192.0.2.1')})
self._assertLoggedAllTests(tests['ip4-check']+tests['ip4-ban'])
self.assertLogged(*tests['ip4-check']+tests['ip4-ban'], all=True)
self.assertNotLogged(tests['ip6'])
# test unban ip4 :
logSys.debug('# === unban ipv4 ==='); self.pruneLog()
action.unban({'ip': IPAddr('192.0.2.1')})
self._assertLoggedAllTests(tests['ip4-check']+tests['ip4-unban'])
self.assertLogged(*tests['ip4-check']+tests['ip4-unban'], all=True)
self.assertNotLogged(tests['ip6'])
# test ban ip6 :
logSys.debug('# === ban ipv6 ==='); self.pruneLog()
action.ban({'ip': IPAddr('2001:DB8::')})
self._assertLoggedAllTests(tests['ip6-check']+tests['ip6-ban'])
self.assertLogged(*tests['ip6-check']+tests['ip6-ban'], all=True)
self.assertNotLogged(tests['ip4'])
# test unban ip6 :
logSys.debug('# === unban ipv6 ==='); self.pruneLog()
action.unban({'ip': IPAddr('2001:DB8::')})
self._assertLoggedAllTests(tests['ip6-check']+tests['ip6-unban'])
self.assertLogged(*tests['ip6-check']+tests['ip6-unban'], all=True)
self.assertNotLogged(tests['ip4'])
# test stop :
logSys.debug('# === stop ==='); self.pruneLog()
action.stop()
self._assertLoggedAllTests(tests['stop'])
self.assertLogged(*tests['stop'], all=True)

View File

@ -325,7 +325,7 @@ class LogCaptureTestCase(unittest.TestCase):
def _is_logged(self, s):
return s in self._log.getvalue()
def assertLogged(self, *s):
def assertLogged(self, *s, **kwargs):
"""Assert that one of the strings was logged
Preferable to assertTrue(self._is_logged(..)))
@ -335,12 +335,22 @@ class LogCaptureTestCase(unittest.TestCase):
----------
s : string or list/set/tuple of strings
Test should succeed if string (or any of the listed) is present in the log
all : boolean, should find all in s
"""
logged = self._log.getvalue()
for s_ in s:
if s_ in logged:
return
raise AssertionError("None among %r was found in the log: %r" % (s, logged))
if not kwargs.get('all', False):
# at least one entry should be found:
for s_ in s:
if s_ in logged:
return
# pragma: no cover
self.fail("None among %r was found in the log: ===\n%s===" % (s, logged))
else:
# each entry should be found:
for s_ in s:
if s_ not in logged:
# pragma: no cover
self.fail("%r was not found in the log: ===\n%s===" % (s_, logged))
def assertNotLogged(self, *s):
"""Assert that strings were not logged
@ -355,7 +365,8 @@ class LogCaptureTestCase(unittest.TestCase):
for s_ in s:
if s_ not in logged:
return
raise AssertionError("All of the %r were found present in the log: %r" % (s, logged))
# pragma: no cover
self.fail("All of the %r were found present in the log: ===\n%s===" % (s, logged))
def pruneLog(self):
self._log.truncate(0)