Browse Source

ENH: reincarnated (some) servertestcases with a new thorough test

new test creates a simple jail and makes sure if bans ip and has
somewhat proper <substitutions> in the taken actions
_tent/expose_banned_ips
Yaroslav Halchenko 13 years ago
parent
commit
3f62546231
  1. 4
      fail2ban-testcases
  2. 80
      testcases/servertestcase.py

4
fail2ban-testcases

@ -67,8 +67,8 @@ tests.addTest(unittest.makeSuite(filtertestcase.LogFile))
tests.addTest(unittest.makeSuite(filtertestcase.GetFailures))
tests.addTest(unittest.makeSuite(filtertestcase.DNSUtilsTests))
# Server
#tests.addTest(unittest.makeSuite(servertestcase.StartStop))
#tests.addTest(unittest.makeSuite(servertestcase.Transmitter))
## tests.addTest(unittest.makeSuite(servertestcase.StartStop))
tests.addTest(unittest.makeSuite(servertestcase.Transmitter))
tests.addTest(unittest.makeSuite(actiontestcase.ExecuteAction))
# FailManager
tests.addTest(unittest.makeSuite(failmanagertestcase.AddFailure))

80
testcases/servertestcase.py

@ -27,24 +27,31 @@ __date__ = "$Date$"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import unittest, socket, time
import unittest, socket, time, os.path
import tempfile
from server.server import Server
from server.jails import UnknownJailException
class StartStop(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
self.__server = Server()
self.__sock = tempfile.NamedTemporaryFile(delete=False).name
self.__server.setLogLevel(0)
self.__server.start(False)
self.__server.start(self.__sock, False)
def tearDown(self):
"""Call after every test case."""
self.assertTrue(os.path.exists(self.__sock))
self.__server.quit()
# sock must have been removed as well
self.assertTrue(not os.path.exists(self.__sock))
def testStartStopJail(self):
name = "TestCase"
self.__server.addJail(name)
backend = 'polling'
self.__server.addJail(name, backend)
self.__server.startJail(name)
time.sleep(1)
self.__server.stopJail(name)
@ -55,14 +62,15 @@ class Transmitter(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
self.__server = Server()
self.__sock = tempfile.NamedTemporaryFile(delete=False).name
self.__server.setLogLevel(0)
self.__server.start(False)
self.__server.start(self.__sock, False)
def tearDown(self):
"""Call after every test case."""
self.__server.quit()
def testSetActionOK(self):
def __testSetActionOK(self):
name = "TestCase"
cmdList = [["add", name],
["set", name, "actionstart", "Action Start"],
@ -85,7 +93,7 @@ class Transmitter(unittest.TestCase):
self.assertEqual(self.__server.transm.proceed(cmd), outList[cnt])
cnt += 1
def testSetActionNOK(self):
def __testSetActionNOK(self):
name = "TestCase"
cmdList = [["addd", name],
["set", name, "test"],
@ -109,12 +117,12 @@ class Transmitter(unittest.TestCase):
self.assertEqual(msg[0], outList[cnt])
cnt += 1
def testJail(self):
def __testJail(self):
name = "TestCase"
cmdList = [["add", name],
["set", name, "logpath", "testcases/files/testcase01.log"],
["set", name, "timeregex", "\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}"],
["set", name, "timepattern", "%b %d %H:%M:%S"],
["set", name, "addlogpath", "testcases/files/testcase01.log"],
#["set", name, "timeregex", "\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}"],
#["set", name, "timepattern", "%b %d %H:%M:%S"],
["set", name, "failregex", "Authentication failure"],
["start", name],
["stop", name],
@ -124,7 +132,55 @@ class Transmitter(unittest.TestCase):
self.__server.transm.proceed(cmd)
if cmd == ["start", name]:
time.sleep(2)
jail = self.__server.jails[name]
jail = self.__server.jails.get(name)
self.assertEqual(jail.getFilter().getFailManager().size(), 0)
self.assertEqual(jail.getAction().getBanManager().size(), 2)
def testJailWithActions(self):
name = "TestCase2"
aname = "TestAction2"
infos = '<ip>|<time>|<failures>|<banned_ips>|<num_banned_ips>'
logfile = "/tmp/fail2ban-tests.log"
outfile = "/tmp/fail2ban-tests.out"
cmdList = [["add", name, "polling"],
["set", "logtarget", "/tmp/fail2ban-tests.log"],
["set", "loglevel", "4"],
["set", "opentail", "False"],
["set", name, "addlogpath", "testcases/files/testcase01.log"],
#["set", name, "addlogpath", "testcases/files/testcase02.log"],
["set", name, "maxretry", 1],
["set", name, "addaction", aname],
["set", name, "addfailregex", ".*Authentication failure for .* from <HOST>\s*$"],
#["set", name, "addfailregex", ".*Failed .* for .* from <HOST> port .* ssh2\s*$"],
["set", name, "actionstart", aname, "rm -f %s; touch %s" % (outfile, outfile)],
["set", name, "actionstop", aname, "echo 'END' >> %s" % outfile],
["set", name, "actioncheck", aname, "[ -e %s ]" % outfile],
["set", name, "actionban", aname, "echo '+%s' >> %s" % (infos, outfile)],
["set", name, "actionunban", aname, "echo '-%s' >> %s" % (infos, outfile)],
["start", name],
["stop", name],
]
for cmd in cmdList:
out = self.__server.getTransm().proceed(cmd)
self.assertTrue(not out[0], msg="Got %s for %s" % (out, cmd))
if cmd == ["start", name]:
time.sleep(3)
jail = self.__server.getJails().get(name)
self.assertEqual(jail.getFilter().failManager.size(), 0)
self.assertEqual(jail.getAction().banManager.size(), 2)
self.assertEqual(jail.getAction().banManager.size(), 1)
# we are done -- jail must be stopped by now
time.sleep(0.5)
# test
self.assertRaises(UnknownJailException, self.__server.getJails().get, name)
# and we should have banned sample IP -- but counts of other
# bans would be 0
self.assertEqual(["+193.168.0.128|1124013600|3||0\n",
"-193.168.0.128|1124013600|3||0\n",
"END\n"],
open(outfile).readlines())
# now remove the files if everything was alright
for f in [logfile, outfile]:
os.unlink(f)

Loading…
Cancel
Save