diff --git a/fail2ban/tests/banmanagertestcase.py b/fail2ban/tests/banmanagertestcase.py index 8c58994c..017e7cf3 100644 --- a/fail2ban/tests/banmanagertestcase.py +++ b/fail2ban/tests/banmanagertestcase.py @@ -29,11 +29,13 @@ import unittest from ..server.banmanager import BanManager from ..server.ticket import BanTicket from .utils import assert_dict_equal +from ..server.filter import IPAddr + class AddFailure(unittest.TestCase): def setUp(self): """Call before every test case.""" - self.__ticket = BanTicket('193.168.0.128', 1167605999.0) + self.__ticket = BanTicket(IPAddr('193.168.0.128'), 1167605999.0) self.__banManager = BanManager() self.assertTrue(self.__banManager.addBanTicket(self.__ticket)) @@ -49,18 +51,18 @@ class AddFailure(unittest.TestCase): self.assertEqual(self.__banManager.size(), 1) def testInListOK(self): - ticket = BanTicket('193.168.0.128', 1167605999.0) + ticket = BanTicket(IPAddr('193.168.0.128'), 1167605999.0) self.assertTrue(self.__banManager._inBanList(ticket)) def testInListNOK(self): - ticket = BanTicket('111.111.1.111', 1167605999.0) + ticket = BanTicket(IPAddr('111.111.1.111'), 1167605999.0) self.assertFalse(self.__banManager._inBanList(ticket)) class StatusExtendedCymruInfo(unittest.TestCase): def setUp(self): """Call before every test case.""" - self.__ban_ip = "93.184.216.34" + self.__ban_ip = IPAddr("93.184.216.34") self.__asn = "15133" self.__country = "EU" self.__rir = "ripencc" diff --git a/fail2ban/tests/databasetestcase.py b/fail2ban/tests/databasetestcase.py index 3d156eda..0aaa7aa7 100644 --- a/fail2ban/tests/databasetestcase.py +++ b/fail2ban/tests/databasetestcase.py @@ -29,7 +29,7 @@ import tempfile import sqlite3 import shutil -from ..server.filter import FileContainer +from ..server.filter import FileContainer, IPAddr from ..server.mytime import MyTime from ..server.ticket import FailTicket from ..server.actions import Actions @@ -97,7 +97,7 @@ class DatabaseTest(LogCaptureTestCase): self.db = Fail2BanDb(self.dbFilename) self.assertEqual(self.db.getJailNames(), set(['DummyJail #29162448 with 0 tickets'])) self.assertEqual(self.db.getLogPaths(), set(['/tmp/Fail2BanDb_pUlZJh.log'])) - ticket = FailTicket("127.0.0.1", 1388009242.26, [u"abc\n"]) + ticket = FailTicket(IPAddr("127.0.0.1"), 1388009242.26, [u"abc\n"]) self.assertEqual(self.db.getBans()[0], ticket) self.assertEqual(self.db.updateDb(Fail2BanDb.__version__), Fail2BanDb.__version__) @@ -171,7 +171,7 @@ class DatabaseTest(LogCaptureTestCase): if Fail2BanDb is None: # pragma: no cover return self.testAddJail() - ticket = FailTicket("127.0.0.1", 0, ["abc\n"]) + ticket = FailTicket(IPAddr("127.0.0.1"), 0, ["abc\n"]) self.db.addBan(self.jail, ticket) self.assertEqual(len(self.db.getBans(jail=self.jail)), 1) @@ -184,9 +184,9 @@ class DatabaseTest(LogCaptureTestCase): self.testAddJail() # invalid + valid, invalid + valid unicode, invalid + valid dual converted (like in filter:readline by fallback) ... tickets = [ - FailTicket("127.0.0.1", 0, ['user "\xd1\xe2\xe5\xf2\xe0"', 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']), - FailTicket("127.0.0.2", 0, ['user "\xd1\xe2\xe5\xf2\xe0"', u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']), - FailTicket("127.0.0.3", 0, ['user "\xd1\xe2\xe5\xf2\xe0"', b'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'.decode('utf-8', 'replace')]) + FailTicket(IPAddr("127.0.0.1"), 0, ['user "\xd1\xe2\xe5\xf2\xe0"', 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']), + FailTicket(IPAddr("127.0.0.2"), 0, ['user "\xd1\xe2\xe5\xf2\xe0"', u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']), + FailTicket(IPAddr("127.0.0.3"), 0, ['user "\xd1\xe2\xe5\xf2\xe0"', b'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'.decode('utf-8', 'replace')]) ] self.db.addBan(self.jail, tickets[0]) self.db.addBan(self.jail, tickets[1]) @@ -197,15 +197,15 @@ class DatabaseTest(LogCaptureTestCase): ## python 2 or 3 : invstr = u'user "\ufffd\ufffd\ufffd\ufffd\ufffd"'.encode('utf-8', 'replace') self.assertTrue( - readtickets[0] == FailTicket("127.0.0.1", 0, [invstr, 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']) + readtickets[0] == FailTicket(IPAddr("127.0.0.1"), 0, [invstr, 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']) or readtickets[0] == tickets[0] ) self.assertTrue( - readtickets[1] == FailTicket("127.0.0.2", 0, [invstr, u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'.encode('utf-8', 'replace')]) + readtickets[1] == FailTicket(IPAddr("127.0.0.2"), 0, [invstr, u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'.encode('utf-8', 'replace')]) or readtickets[1] == tickets[1] ) self.assertTrue( - readtickets[2] == FailTicket("127.0.0.3", 0, [invstr, 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']) + readtickets[2] == FailTicket(IPAddr("127.0.0.3"), 0, [invstr, 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']) or readtickets[2] == tickets[2] ) @@ -220,9 +220,9 @@ class DatabaseTest(LogCaptureTestCase): return self.testAddJail() self.db.addBan( - self.jail, FailTicket("127.0.0.1", MyTime.time() - 60, ["abc\n"])) + self.jail, FailTicket(IPAddr("127.0.0.1"), MyTime.time() - 60, ["abc\n"])) self.db.addBan( - self.jail, FailTicket("127.0.0.1", MyTime.time() - 40, ["abc\n"])) + self.jail, FailTicket(IPAddr("127.0.0.1"), MyTime.time() - 40, ["abc\n"])) self.assertEqual(len(self.db.getBans(jail=self.jail,bantime=50)), 1) self.assertEqual(len(self.db.getBans(jail=self.jail,bantime=20)), 0) # Negative values are for persistent bans, and such all bans should @@ -237,27 +237,27 @@ class DatabaseTest(LogCaptureTestCase): jail2 = DummyJail() self.db.addJail(jail2) - ticket = FailTicket("127.0.0.1", MyTime.time() - 40, ["abc\n"]) + ticket = FailTicket(IPAddr("127.0.0.1"), MyTime.time() - 40, ["abc\n"]) ticket.setAttempt(10) self.db.addBan(self.jail, ticket) - ticket = FailTicket("127.0.0.1", MyTime.time() - 30, ["123\n"]) + ticket = FailTicket(IPAddr("127.0.0.1"), MyTime.time() - 30, ["123\n"]) ticket.setAttempt(20) self.db.addBan(self.jail, ticket) - ticket = FailTicket("127.0.0.2", MyTime.time() - 20, ["ABC\n"]) + ticket = FailTicket(IPAddr("127.0.0.2"), MyTime.time() - 20, ["ABC\n"]) ticket.setAttempt(30) self.db.addBan(self.jail, ticket) - ticket = FailTicket("127.0.0.1", MyTime.time() - 10, ["ABC\n"]) + ticket = FailTicket(IPAddr("127.0.0.1"), MyTime.time() - 10, ["ABC\n"]) ticket.setAttempt(40) self.db.addBan(jail2, ticket) # All for IP 127.0.0.1 - ticket = self.db.getBansMerged("127.0.0.1") + ticket = self.db.getBansMerged(IPAddr("127.0.0.1")) self.assertEqual(ticket.getIP(), "127.0.0.1") self.assertEqual(ticket.getAttempt(), 70) self.assertEqual(ticket.getMatches(), ["abc\n", "123\n", "ABC\n"]) # All for IP 127.0.0.1 for single jail - ticket = self.db.getBansMerged("127.0.0.1", jail=self.jail) + ticket = self.db.getBansMerged(IPAddr("127.0.0.1"), jail=self.jail) self.assertEqual(ticket.getIP(), "127.0.0.1") self.assertEqual(ticket.getAttempt(), 30) self.assertEqual(ticket.getMatches(), ["abc\n", "123\n"]) @@ -265,23 +265,23 @@ class DatabaseTest(LogCaptureTestCase): # Should cache result if no extra bans added self.assertEqual( id(ticket), - id(self.db.getBansMerged("127.0.0.1", jail=self.jail))) + id(self.db.getBansMerged(IPAddr("127.0.0.1"), jail=self.jail))) - newTicket = FailTicket("127.0.0.2", MyTime.time() - 20, ["ABC\n"]) + newTicket = FailTicket(IPAddr("127.0.0.2"), MyTime.time() - 20, ["ABC\n"]) ticket.setAttempt(40) # Add ticket, but not for same IP, so cache still valid self.db.addBan(self.jail, newTicket) self.assertEqual( id(ticket), - id(self.db.getBansMerged("127.0.0.1", jail=self.jail))) + id(self.db.getBansMerged(IPAddr("127.0.0.1"), jail=self.jail))) - newTicket = FailTicket("127.0.0.1", MyTime.time() - 10, ["ABC\n"]) + newTicket = FailTicket(IPAddr("127.0.0.1"), MyTime.time() - 10, ["ABC\n"]) ticket.setAttempt(40) self.db.addBan(self.jail, newTicket) # Added ticket, so cache should have been cleared self.assertNotEqual( id(ticket), - id(self.db.getBansMerged("127.0.0.1", jail=self.jail))) + id(self.db.getBansMerged(IPAddr("127.0.0.1"), jail=self.jail))) tickets = self.db.getBansMerged() self.assertEqual(len(tickets), 2) @@ -312,7 +312,7 @@ class DatabaseTest(LogCaptureTestCase): "action_checkainfo", os.path.join(TEST_FILES_DIR, "action.d/action_checkainfo.py"), {}) - ticket = FailTicket("1.2.3.4", MyTime.time(), ['test', 'test']) + ticket = FailTicket(IPAddr("1.2.3.4"), MyTime.time(), ['test', 'test']) ticket.setAttempt(5) self.jail.putFailTicket(ticket) actions._Actions__checkBan() @@ -339,7 +339,7 @@ class DatabaseTest(LogCaptureTestCase): # Should leave jail self.testAddJail() self.db.addBan( - self.jail, FailTicket("127.0.0.1", MyTime.time(), ["abc\n"])) + self.jail, FailTicket(IPAddr("127.0.0.1"), MyTime.time(), ["abc\n"])) self.db.delJail(self.jail) self.db.purge() # Should leave jail as ban present self.assertEqual(len(self.db.getJailNames()), 1) diff --git a/fail2ban/tests/failmanagertestcase.py b/fail2ban/tests/failmanagertestcase.py index a8c0f6fc..ee63e456 100644 --- a/fail2ban/tests/failmanagertestcase.py +++ b/fail2ban/tests/failmanagertestcase.py @@ -28,6 +28,7 @@ import unittest from ..server.failmanager import FailManager, FailManagerEmpty from ..server.ticket import FailTicket +from ..server.filter import IPAddr class AddFailure(unittest.TestCase): @@ -50,7 +51,7 @@ class AddFailure(unittest.TestCase): self.__failManager = FailManager() for i in self.__items: - self.__failManager.addFailure(FailTicket(i[0], i[1])) + self.__failManager.addFailure(FailTicket(IPAddr(i[0]), i[1])) def tearDown(self): """Call after every test case.""" @@ -69,8 +70,8 @@ class AddFailure(unittest.TestCase): self.__failManager.setMaxTime(600) def _testDel(self): - self.__failManager.delFailure('193.168.0.128') - self.__failManager.delFailure('111.111.1.111') + self.__failManager.delFailure(IPAddr('193.168.0.128')) + self.__failManager.delFailure(IPAddr('111.111.1.111')) self.assertEqual(self.__failManager.size(), 1) @@ -89,7 +90,7 @@ class AddFailure(unittest.TestCase): #ticket = FailTicket('193.168.0.128', None) ticket = self.__failManager.toBan() self.assertEqual(ticket.getIP(), "193.168.0.128") - self.assertTrue(isinstance(ticket.getIP(), str)) + self.assertTrue(isinstance(ticket.getIP(), IPAddr)) # finish with rudimentary tests of the ticket # verify consistent str diff --git a/fail2ban/tests/files/testcase-usedns.log b/fail2ban/tests/files/testcase-usedns.log index de484127..28d298f5 100644 --- a/fail2ban/tests/files/testcase-usedns.log +++ b/fail2ban/tests/files/testcase-usedns.log @@ -1,2 +1,3 @@ Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2 -Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.34 port 51332 ssh2 +Aug 14 11:56:59 i60p295 sshd[12365]: Failed publickey for roehl from 172.31.0.34 port 51332 ssh2 +Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:172.31.0.34 port 51332 ssh2 diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index 40879b66..ffd32155 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -38,7 +38,7 @@ except ImportError: from ..server.jail import Jail from ..server.filterpoll import FilterPoll -from ..server.filter import Filter, FileFilter, DNSUtils +from ..server.filter import Filter, FileFilter, DNSUtils, IPAddr from ..server.failmanager import FailManagerEmpty from ..server.mytime import MyTime from .utils import setUpMyTime, tearDownMyTime, mtimesleep, LogCaptureTestCase @@ -85,8 +85,14 @@ def _assert_equal_entries(utest, found, output, count=None): and report helpful failure reports instead of millions of seconds ;) """ - utest.assertEqual(found[0], output[0]) # IP - utest.assertEqual(found[1], count or output[1]) # count + # if DNS is involved, multiple addresses may be returned + if isinstance(output[0], list): + utest.assertIn(found[0], output[0]) # IP + utest.assertEqual(found[1], count or output[1]) # count + else: + utest.assertEqual(found[0], output[0]) # IP + utest.assertEqual(found[1], count or output[1]) # count + found_time, output_time = \ MyTime.localtime(found[2]),\ MyTime.localtime(output[2]) @@ -241,7 +247,7 @@ class IgnoreIP(LogCaptureTestCase): ipList = "127.0.0.1", "192.168.0.1", "255.255.255.255", "99.99.99.99" for ip in ipList: self.filter.addIgnoreIP(ip) - self.assertTrue(self.filter.inIgnoreIPList(ip)) + self.assertTrue(self.filter.inIgnoreIPList(IPAddr(ip))) def testIgnoreIPNOK(self): ipList = "", "999.999.999.999", "abcdef.abcdef", "192.168.0." @@ -251,21 +257,21 @@ class IgnoreIP(LogCaptureTestCase): def testIgnoreIPCIDR(self): self.filter.addIgnoreIP('192.168.1.0/25') - self.assertTrue(self.filter.inIgnoreIPList('192.168.1.0')) - self.assertTrue(self.filter.inIgnoreIPList('192.168.1.1')) - self.assertTrue(self.filter.inIgnoreIPList('192.168.1.127')) - self.assertFalse(self.filter.inIgnoreIPList('192.168.1.128')) - self.assertFalse(self.filter.inIgnoreIPList('192.168.1.255')) - self.assertFalse(self.filter.inIgnoreIPList('192.168.0.255')) + self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.168.1.0'))) + self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.168.1.1'))) + self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.168.1.127'))) + self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.168.1.128'))) + self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.168.1.255'))) + self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.168.0.255'))) def testIgnoreIPMask(self): self.filter.addIgnoreIP('192.168.1.0/255.255.255.128') - self.assertTrue(self.filter.inIgnoreIPList('192.168.1.0')) - self.assertTrue(self.filter.inIgnoreIPList('192.168.1.1')) - self.assertTrue(self.filter.inIgnoreIPList('192.168.1.127')) - self.assertFalse(self.filter.inIgnoreIPList('192.168.1.128')) - self.assertFalse(self.filter.inIgnoreIPList('192.168.1.255')) - self.assertFalse(self.filter.inIgnoreIPList('192.168.0.255')) + self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.168.1.0'))) + self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.168.1.1'))) + self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.168.1.127'))) + self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.168.1.128'))) + self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.168.1.255'))) + self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.168.0.255'))) def testIgnoreInProcessLine(self): setUpMyTime() @@ -283,17 +289,17 @@ class IgnoreIP(LogCaptureTestCase): def testIgnoreCommand(self): self.filter.setIgnoreCommand(sys.executable + ' ' + os.path.join(TEST_FILES_DIR, "ignorecommand.py ")) - self.assertTrue(self.filter.inIgnoreIPList("10.0.0.1")) - self.assertFalse(self.filter.inIgnoreIPList("10.0.0.0")) + self.assertTrue(self.filter.inIgnoreIPList(IPAddr("10.0.0.1"))) + self.assertFalse(self.filter.inIgnoreIPList(IPAddr("10.0.0.0"))) def testIgnoreCauseOK(self): ip = "93.184.216.34" for ignore_source in ["dns", "ip", "command"]: - self.filter.logIgnoreIp(ip, True, ignore_source=ignore_source) + self.filter.logIgnoreIp(IPAddr(ip), True, ignore_source=ignore_source) self.assertLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, ignore_source)) def testIgnoreCauseNOK(self): - self.filter.logIgnoreIp("example.com", False, ignore_source="NOT_LOGGED") + self.filter.logIgnoreIp(IPAddr("example.com"), False, ignore_source="NOT_LOGGED") self.assertNotLogged("[%s] Ignore %s by %s" % (self.jail.name, "example.com", "NOT_LOGGED")) @@ -301,14 +307,14 @@ class IgnoreIPDNS(IgnoreIP): def testIgnoreIPDNSOK(self): self.filter.addIgnoreIP("www.epfl.ch") - self.assertTrue(self.filter.inIgnoreIPList("128.178.50.12")) + self.assertTrue(self.filter.inIgnoreIPList(IPAddr("128.178.50.12"))) def testIgnoreIPDNSNOK(self): # Test DNS self.filter.addIgnoreIP("www.epfl.ch") - self.assertFalse(self.filter.inIgnoreIPList("127.177.50.10")) - self.assertFalse(self.filter.inIgnoreIPList("128.178.50.11")) - self.assertFalse(self.filter.inIgnoreIPList("128.178.50.13")) + self.assertFalse(self.filter.inIgnoreIPList(IPAddr("127.177.50.10"))) + self.assertFalse(self.filter.inIgnoreIPList(IPAddr("128.178.50.11"))) + self.assertFalse(self.filter.inIgnoreIPList(IPAddr("128.178.50.13"))) class LogFile(LogCaptureTestCase): @@ -963,12 +969,12 @@ class GetFailures(LogCaptureTestCase): def testGetFailuresUseDNS(self): # We should still catch failures with usedns = no ;-) - output_yes = ('93.184.216.34', 2, 1124013539.0, - [u'Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2', - u'Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.34 port 51332 ssh2']) + output_yes = (['93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946'], 1, 1124013299.0, + [u'Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2']) - output_no = ('93.184.216.34', 1, 1124013539.0, - [u'Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.34 port 51332 ssh2']) + output_no = ('172.31.0.34', 2, 1124013539.0, + [u'Aug 14 11:56:59 i60p295 sshd[12365]: Failed publickey for roehl from 172.31.0.34 port 51332 ssh2', + u'Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:172.31.0.34 port 51332 ssh2']) # Actually no exception would be raised -- it will be just set to 'no' #self.assertRaises(ValueError, @@ -1067,9 +1073,11 @@ class DNSUtilsTests(unittest.TestCase): res = DNSUtils.textToIp('www.example.com', 'no') self.assertEqual(res, []) res = DNSUtils.textToIp('www.example.com', 'warn') - self.assertEqual(res, ['93.184.216.34']) + self.assertIn('93.184.216.34', res) + self.assertIn('2606:2800:220:1:248:1893:25c8:1946', res) res = DNSUtils.textToIp('www.example.com', 'yes') - self.assertEqual(res, ['93.184.216.34']) + self.assertIn('93.184.216.34', res) + self.assertIn('2606:2800:220:1:248:1893:25c8:1946', res) def testTextToIp(self): # Test hostnames @@ -1081,32 +1089,31 @@ class DNSUtilsTests(unittest.TestCase): for s in hostnames: res = DNSUtils.textToIp(s, 'yes') if s == 'www.example.com': - self.assertEqual(res, ['93.184.216.34']) + self.assertIn('93.184.216.34', res) + self.assertIn('2606:2800:220:1:248:1893:25c8:1946', res) else: self.assertEqual(res, []) def testIpToName(self): - res = DNSUtils.ipToName('8.8.4.4') + res = DNSUtils.ipToName(IPAddr('8.8.4.4')) + self.assertEqual(res, 'google-public-dns-b.google.com') + res = DNSUtils.ipToName(IPAddr('2001:4860:4860::8844')) self.assertEqual(res, 'google-public-dns-b.google.com') # invalid ip (TEST-NET-1 according to RFC 5737) - res = DNSUtils.ipToName('192.0.2.0') + res = DNSUtils.ipToName(IPAddr('192.0.2.0')) self.assertEqual(res, None) def testAddr2bin(self): - res = DNSUtils.addr2bin('10.0.0.0') - self.assertEqual(res, 167772160L) - res = DNSUtils.addr2bin('10.0.0.0', cidr=None) - self.assertEqual(res, 167772160L) - res = DNSUtils.addr2bin('10.0.0.0', cidr=32L) - self.assertEqual(res, 167772160L) - res = DNSUtils.addr2bin('10.0.0.1', cidr=32L) - self.assertEqual(res, 167772161L) - res = DNSUtils.addr2bin('10.0.0.1', cidr=31L) - self.assertEqual(res, 167772160L) - - def testBin2addr(self): - res = DNSUtils.bin2addr(167772160L) - self.assertEqual(res, '10.0.0.0') + res = IPAddr('10.0.0.0') + self.assertEqual(res.addr, 167772160L) + res = IPAddr('10.0.0.0', cidr=None) + self.assertEqual(res.addr, 167772160L) + res = IPAddr('10.0.0.0', cidr=32L) + self.assertEqual(res.addr, 167772160L) + res = IPAddr('10.0.0.1', cidr=32L) + self.assertEqual(res.addr, 167772161L) + res = IPAddr('10.0.0.1', cidr=31L) + self.assertEqual(res.addr, 167772160L) class JailTests(unittest.TestCase): diff --git a/fail2ban/tests/servertestcase.py b/fail2ban/tests/servertestcase.py index 07e10c7d..e14befce 100644 --- a/fail2ban/tests/servertestcase.py +++ b/fail2ban/tests/servertestcase.py @@ -36,6 +36,7 @@ from ..server.failregex import Regex, FailRegex, RegexException from ..server.server import Server from ..server.jail import Jail from ..server.jailthread import JailThread +from ..server.filter import IPAddr from .utils import LogCaptureTestCase from ..helpers import getLogger from .. import version @@ -110,18 +111,23 @@ class TransmitterBase(unittest.TestCase): cmdAdd = "add" + cmd cmdDel = "del" + cmd + # sorting IPAddr objects and strings differs so that the comparism values + # must also be sorted as IPAddr objects + # convert to IPAddr objects if values look like a list of IP address strings + ips = map(lambda x: IPAddr(x) if IPAddr.searchIP(x) else x , values) + self.assertEqual( self.transm.proceed(["get", jail, cmd]), (0, [])) for n, value in enumerate(values): ret = self.transm.proceed(["set", jail, cmdAdd, value]) - self.assertEqual((ret[0], sorted(ret[1])), (0, sorted(values[:n+1]))) + self.assertEqual((ret[0], sorted(ret[1])), (0, sorted(ips[:n+1]))) ret = self.transm.proceed(["get", jail, cmd]) - self.assertEqual((ret[0], sorted(ret[1])), (0, sorted(values[:n+1]))) - for n, value in enumerate(values): + self.assertEqual((ret[0], sorted(ret[1])), (0, sorted(ips[:n+1]))) + for n, value in enumerate(ips): ret = self.transm.proceed(["set", jail, cmdDel, value]) - self.assertEqual((ret[0], sorted(ret[1])), (0, sorted(values[n+1:]))) + self.assertEqual((ret[0], sorted(ret[1])), (0, sorted(ips[n+1:]))) ret = self.transm.proceed(["get", jail, cmd]) - self.assertEqual((ret[0], sorted(ret[1])), (0, sorted(values[n+1:]))) + self.assertEqual((ret[0], sorted(ret[1])), (0, sorted(ips[n+1:]))) def jailAddDelRegexTest(self, cmd, inValues, outValues, jail): cmdAdd = "add" + cmd