treat IP address as objects also in tests

pull/1374/head
Alexander Koeppe 2016-03-15 23:08:10 +01:00
parent a9d691b0f5
commit dcfa8c5aa4
6 changed files with 103 additions and 86 deletions

View File

@ -29,11 +29,13 @@ import unittest
from ..server.banmanager import BanManager
from ..server.ticket import BanTicket
from .utils import assert_dict_equal
from ..server.filter import IPAddr
class AddFailure(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
self.__ticket = BanTicket('193.168.0.128', 1167605999.0)
self.__ticket = BanTicket(IPAddr('193.168.0.128'), 1167605999.0)
self.__banManager = BanManager()
self.assertTrue(self.__banManager.addBanTicket(self.__ticket))
@ -49,18 +51,18 @@ class AddFailure(unittest.TestCase):
self.assertEqual(self.__banManager.size(), 1)
def testInListOK(self):
ticket = BanTicket('193.168.0.128', 1167605999.0)
ticket = BanTicket(IPAddr('193.168.0.128'), 1167605999.0)
self.assertTrue(self.__banManager._inBanList(ticket))
def testInListNOK(self):
ticket = BanTicket('111.111.1.111', 1167605999.0)
ticket = BanTicket(IPAddr('111.111.1.111'), 1167605999.0)
self.assertFalse(self.__banManager._inBanList(ticket))
class StatusExtendedCymruInfo(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
self.__ban_ip = "93.184.216.34"
self.__ban_ip = IPAddr("93.184.216.34")
self.__asn = "15133"
self.__country = "EU"
self.__rir = "ripencc"

View File

@ -29,7 +29,7 @@ import tempfile
import sqlite3
import shutil
from ..server.filter import FileContainer
from ..server.filter import FileContainer, IPAddr
from ..server.mytime import MyTime
from ..server.ticket import FailTicket
from ..server.actions import Actions
@ -97,7 +97,7 @@ class DatabaseTest(LogCaptureTestCase):
self.db = Fail2BanDb(self.dbFilename)
self.assertEqual(self.db.getJailNames(), set(['DummyJail #29162448 with 0 tickets']))
self.assertEqual(self.db.getLogPaths(), set(['/tmp/Fail2BanDb_pUlZJh.log']))
ticket = FailTicket("127.0.0.1", 1388009242.26, [u"abc\n"])
ticket = FailTicket(IPAddr("127.0.0.1"), 1388009242.26, [u"abc\n"])
self.assertEqual(self.db.getBans()[0], ticket)
self.assertEqual(self.db.updateDb(Fail2BanDb.__version__), Fail2BanDb.__version__)
@ -171,7 +171,7 @@ class DatabaseTest(LogCaptureTestCase):
if Fail2BanDb is None: # pragma: no cover
return
self.testAddJail()
ticket = FailTicket("127.0.0.1", 0, ["abc\n"])
ticket = FailTicket(IPAddr("127.0.0.1"), 0, ["abc\n"])
self.db.addBan(self.jail, ticket)
self.assertEqual(len(self.db.getBans(jail=self.jail)), 1)
@ -184,9 +184,9 @@ class DatabaseTest(LogCaptureTestCase):
self.testAddJail()
# invalid + valid, invalid + valid unicode, invalid + valid dual converted (like in filter:readline by fallback) ...
tickets = [
FailTicket("127.0.0.1", 0, ['user "\xd1\xe2\xe5\xf2\xe0"', 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
FailTicket("127.0.0.2", 0, ['user "\xd1\xe2\xe5\xf2\xe0"', u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
FailTicket("127.0.0.3", 0, ['user "\xd1\xe2\xe5\xf2\xe0"', b'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'.decode('utf-8', 'replace')])
FailTicket(IPAddr("127.0.0.1"), 0, ['user "\xd1\xe2\xe5\xf2\xe0"', 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
FailTicket(IPAddr("127.0.0.2"), 0, ['user "\xd1\xe2\xe5\xf2\xe0"', u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
FailTicket(IPAddr("127.0.0.3"), 0, ['user "\xd1\xe2\xe5\xf2\xe0"', b'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'.decode('utf-8', 'replace')])
]
self.db.addBan(self.jail, tickets[0])
self.db.addBan(self.jail, tickets[1])
@ -197,15 +197,15 @@ class DatabaseTest(LogCaptureTestCase):
## python 2 or 3 :
invstr = u'user "\ufffd\ufffd\ufffd\ufffd\ufffd"'.encode('utf-8', 'replace')
self.assertTrue(
readtickets[0] == FailTicket("127.0.0.1", 0, [invstr, 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'])
readtickets[0] == FailTicket(IPAddr("127.0.0.1"), 0, [invstr, 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'])
or readtickets[0] == tickets[0]
)
self.assertTrue(
readtickets[1] == FailTicket("127.0.0.2", 0, [invstr, u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'.encode('utf-8', 'replace')])
readtickets[1] == FailTicket(IPAddr("127.0.0.2"), 0, [invstr, u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'.encode('utf-8', 'replace')])
or readtickets[1] == tickets[1]
)
self.assertTrue(
readtickets[2] == FailTicket("127.0.0.3", 0, [invstr, 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'])
readtickets[2] == FailTicket(IPAddr("127.0.0.3"), 0, [invstr, 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'])
or readtickets[2] == tickets[2]
)
@ -220,9 +220,9 @@ class DatabaseTest(LogCaptureTestCase):
return
self.testAddJail()
self.db.addBan(
self.jail, FailTicket("127.0.0.1", MyTime.time() - 60, ["abc\n"]))
self.jail, FailTicket(IPAddr("127.0.0.1"), MyTime.time() - 60, ["abc\n"]))
self.db.addBan(
self.jail, FailTicket("127.0.0.1", MyTime.time() - 40, ["abc\n"]))
self.jail, FailTicket(IPAddr("127.0.0.1"), MyTime.time() - 40, ["abc\n"]))
self.assertEqual(len(self.db.getBans(jail=self.jail,bantime=50)), 1)
self.assertEqual(len(self.db.getBans(jail=self.jail,bantime=20)), 0)
# Negative values are for persistent bans, and such all bans should
@ -237,27 +237,27 @@ class DatabaseTest(LogCaptureTestCase):
jail2 = DummyJail()
self.db.addJail(jail2)
ticket = FailTicket("127.0.0.1", MyTime.time() - 40, ["abc\n"])
ticket = FailTicket(IPAddr("127.0.0.1"), MyTime.time() - 40, ["abc\n"])
ticket.setAttempt(10)
self.db.addBan(self.jail, ticket)
ticket = FailTicket("127.0.0.1", MyTime.time() - 30, ["123\n"])
ticket = FailTicket(IPAddr("127.0.0.1"), MyTime.time() - 30, ["123\n"])
ticket.setAttempt(20)
self.db.addBan(self.jail, ticket)
ticket = FailTicket("127.0.0.2", MyTime.time() - 20, ["ABC\n"])
ticket = FailTicket(IPAddr("127.0.0.2"), MyTime.time() - 20, ["ABC\n"])
ticket.setAttempt(30)
self.db.addBan(self.jail, ticket)
ticket = FailTicket("127.0.0.1", MyTime.time() - 10, ["ABC\n"])
ticket = FailTicket(IPAddr("127.0.0.1"), MyTime.time() - 10, ["ABC\n"])
ticket.setAttempt(40)
self.db.addBan(jail2, ticket)
# All for IP 127.0.0.1
ticket = self.db.getBansMerged("127.0.0.1")
ticket = self.db.getBansMerged(IPAddr("127.0.0.1"))
self.assertEqual(ticket.getIP(), "127.0.0.1")
self.assertEqual(ticket.getAttempt(), 70)
self.assertEqual(ticket.getMatches(), ["abc\n", "123\n", "ABC\n"])
# All for IP 127.0.0.1 for single jail
ticket = self.db.getBansMerged("127.0.0.1", jail=self.jail)
ticket = self.db.getBansMerged(IPAddr("127.0.0.1"), jail=self.jail)
self.assertEqual(ticket.getIP(), "127.0.0.1")
self.assertEqual(ticket.getAttempt(), 30)
self.assertEqual(ticket.getMatches(), ["abc\n", "123\n"])
@ -265,23 +265,23 @@ class DatabaseTest(LogCaptureTestCase):
# Should cache result if no extra bans added
self.assertEqual(
id(ticket),
id(self.db.getBansMerged("127.0.0.1", jail=self.jail)))
id(self.db.getBansMerged(IPAddr("127.0.0.1"), jail=self.jail)))
newTicket = FailTicket("127.0.0.2", MyTime.time() - 20, ["ABC\n"])
newTicket = FailTicket(IPAddr("127.0.0.2"), MyTime.time() - 20, ["ABC\n"])
ticket.setAttempt(40)
# Add ticket, but not for same IP, so cache still valid
self.db.addBan(self.jail, newTicket)
self.assertEqual(
id(ticket),
id(self.db.getBansMerged("127.0.0.1", jail=self.jail)))
id(self.db.getBansMerged(IPAddr("127.0.0.1"), jail=self.jail)))
newTicket = FailTicket("127.0.0.1", MyTime.time() - 10, ["ABC\n"])
newTicket = FailTicket(IPAddr("127.0.0.1"), MyTime.time() - 10, ["ABC\n"])
ticket.setAttempt(40)
self.db.addBan(self.jail, newTicket)
# Added ticket, so cache should have been cleared
self.assertNotEqual(
id(ticket),
id(self.db.getBansMerged("127.0.0.1", jail=self.jail)))
id(self.db.getBansMerged(IPAddr("127.0.0.1"), jail=self.jail)))
tickets = self.db.getBansMerged()
self.assertEqual(len(tickets), 2)
@ -312,7 +312,7 @@ class DatabaseTest(LogCaptureTestCase):
"action_checkainfo",
os.path.join(TEST_FILES_DIR, "action.d/action_checkainfo.py"),
{})
ticket = FailTicket("1.2.3.4", MyTime.time(), ['test', 'test'])
ticket = FailTicket(IPAddr("1.2.3.4"), MyTime.time(), ['test', 'test'])
ticket.setAttempt(5)
self.jail.putFailTicket(ticket)
actions._Actions__checkBan()
@ -339,7 +339,7 @@ class DatabaseTest(LogCaptureTestCase):
# Should leave jail
self.testAddJail()
self.db.addBan(
self.jail, FailTicket("127.0.0.1", MyTime.time(), ["abc\n"]))
self.jail, FailTicket(IPAddr("127.0.0.1"), MyTime.time(), ["abc\n"]))
self.db.delJail(self.jail)
self.db.purge() # Should leave jail as ban present
self.assertEqual(len(self.db.getJailNames()), 1)

View File

@ -28,6 +28,7 @@ import unittest
from ..server.failmanager import FailManager, FailManagerEmpty
from ..server.ticket import FailTicket
from ..server.filter import IPAddr
class AddFailure(unittest.TestCase):
@ -50,7 +51,7 @@ class AddFailure(unittest.TestCase):
self.__failManager = FailManager()
for i in self.__items:
self.__failManager.addFailure(FailTicket(i[0], i[1]))
self.__failManager.addFailure(FailTicket(IPAddr(i[0]), i[1]))
def tearDown(self):
"""Call after every test case."""
@ -69,8 +70,8 @@ class AddFailure(unittest.TestCase):
self.__failManager.setMaxTime(600)
def _testDel(self):
self.__failManager.delFailure('193.168.0.128')
self.__failManager.delFailure('111.111.1.111')
self.__failManager.delFailure(IPAddr('193.168.0.128'))
self.__failManager.delFailure(IPAddr('111.111.1.111'))
self.assertEqual(self.__failManager.size(), 1)
@ -89,7 +90,7 @@ class AddFailure(unittest.TestCase):
#ticket = FailTicket('193.168.0.128', None)
ticket = self.__failManager.toBan()
self.assertEqual(ticket.getIP(), "193.168.0.128")
self.assertTrue(isinstance(ticket.getIP(), str))
self.assertTrue(isinstance(ticket.getIP(), IPAddr))
# finish with rudimentary tests of the ticket
# verify consistent str

View File

@ -1,2 +1,3 @@
Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2
Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.34 port 51332 ssh2
Aug 14 11:56:59 i60p295 sshd[12365]: Failed publickey for roehl from 172.31.0.34 port 51332 ssh2
Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:172.31.0.34 port 51332 ssh2

View File

@ -38,7 +38,7 @@ except ImportError:
from ..server.jail import Jail
from ..server.filterpoll import FilterPoll
from ..server.filter import Filter, FileFilter, DNSUtils
from ..server.filter import Filter, FileFilter, DNSUtils, IPAddr
from ..server.failmanager import FailManagerEmpty
from ..server.mytime import MyTime
from .utils import setUpMyTime, tearDownMyTime, mtimesleep, LogCaptureTestCase
@ -85,8 +85,14 @@ def _assert_equal_entries(utest, found, output, count=None):
and report helpful failure reports instead of millions of seconds ;)
"""
# if DNS is involved, multiple addresses may be returned
if isinstance(output[0], list):
utest.assertIn(found[0], output[0]) # IP
utest.assertEqual(found[1], count or output[1]) # count
else:
utest.assertEqual(found[0], output[0]) # IP
utest.assertEqual(found[1], count or output[1]) # count
found_time, output_time = \
MyTime.localtime(found[2]),\
MyTime.localtime(output[2])
@ -241,7 +247,7 @@ class IgnoreIP(LogCaptureTestCase):
ipList = "127.0.0.1", "192.168.0.1", "255.255.255.255", "99.99.99.99"
for ip in ipList:
self.filter.addIgnoreIP(ip)
self.assertTrue(self.filter.inIgnoreIPList(ip))
self.assertTrue(self.filter.inIgnoreIPList(IPAddr(ip)))
def testIgnoreIPNOK(self):
ipList = "", "999.999.999.999", "abcdef.abcdef", "192.168.0."
@ -251,21 +257,21 @@ class IgnoreIP(LogCaptureTestCase):
def testIgnoreIPCIDR(self):
self.filter.addIgnoreIP('192.168.1.0/25')
self.assertTrue(self.filter.inIgnoreIPList('192.168.1.0'))
self.assertTrue(self.filter.inIgnoreIPList('192.168.1.1'))
self.assertTrue(self.filter.inIgnoreIPList('192.168.1.127'))
self.assertFalse(self.filter.inIgnoreIPList('192.168.1.128'))
self.assertFalse(self.filter.inIgnoreIPList('192.168.1.255'))
self.assertFalse(self.filter.inIgnoreIPList('192.168.0.255'))
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.168.1.0')))
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.168.1.1')))
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.168.1.127')))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.168.1.128')))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.168.1.255')))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.168.0.255')))
def testIgnoreIPMask(self):
self.filter.addIgnoreIP('192.168.1.0/255.255.255.128')
self.assertTrue(self.filter.inIgnoreIPList('192.168.1.0'))
self.assertTrue(self.filter.inIgnoreIPList('192.168.1.1'))
self.assertTrue(self.filter.inIgnoreIPList('192.168.1.127'))
self.assertFalse(self.filter.inIgnoreIPList('192.168.1.128'))
self.assertFalse(self.filter.inIgnoreIPList('192.168.1.255'))
self.assertFalse(self.filter.inIgnoreIPList('192.168.0.255'))
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.168.1.0')))
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.168.1.1')))
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.168.1.127')))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.168.1.128')))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.168.1.255')))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.168.0.255')))
def testIgnoreInProcessLine(self):
setUpMyTime()
@ -283,17 +289,17 @@ class IgnoreIP(LogCaptureTestCase):
def testIgnoreCommand(self):
self.filter.setIgnoreCommand(sys.executable + ' ' + os.path.join(TEST_FILES_DIR, "ignorecommand.py <ip>"))
self.assertTrue(self.filter.inIgnoreIPList("10.0.0.1"))
self.assertFalse(self.filter.inIgnoreIPList("10.0.0.0"))
self.assertTrue(self.filter.inIgnoreIPList(IPAddr("10.0.0.1")))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr("10.0.0.0")))
def testIgnoreCauseOK(self):
ip = "93.184.216.34"
for ignore_source in ["dns", "ip", "command"]:
self.filter.logIgnoreIp(ip, True, ignore_source=ignore_source)
self.filter.logIgnoreIp(IPAddr(ip), True, ignore_source=ignore_source)
self.assertLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, ignore_source))
def testIgnoreCauseNOK(self):
self.filter.logIgnoreIp("example.com", False, ignore_source="NOT_LOGGED")
self.filter.logIgnoreIp(IPAddr("example.com"), False, ignore_source="NOT_LOGGED")
self.assertNotLogged("[%s] Ignore %s by %s" % (self.jail.name, "example.com", "NOT_LOGGED"))
@ -301,14 +307,14 @@ class IgnoreIPDNS(IgnoreIP):
def testIgnoreIPDNSOK(self):
self.filter.addIgnoreIP("www.epfl.ch")
self.assertTrue(self.filter.inIgnoreIPList("128.178.50.12"))
self.assertTrue(self.filter.inIgnoreIPList(IPAddr("128.178.50.12")))
def testIgnoreIPDNSNOK(self):
# Test DNS
self.filter.addIgnoreIP("www.epfl.ch")
self.assertFalse(self.filter.inIgnoreIPList("127.177.50.10"))
self.assertFalse(self.filter.inIgnoreIPList("128.178.50.11"))
self.assertFalse(self.filter.inIgnoreIPList("128.178.50.13"))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr("127.177.50.10")))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr("128.178.50.11")))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr("128.178.50.13")))
class LogFile(LogCaptureTestCase):
@ -963,12 +969,12 @@ class GetFailures(LogCaptureTestCase):
def testGetFailuresUseDNS(self):
# We should still catch failures with usedns = no ;-)
output_yes = ('93.184.216.34', 2, 1124013539.0,
[u'Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2',
u'Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.34 port 51332 ssh2'])
output_yes = (['93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946'], 1, 1124013299.0,
[u'Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2'])
output_no = ('93.184.216.34', 1, 1124013539.0,
[u'Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.34 port 51332 ssh2'])
output_no = ('172.31.0.34', 2, 1124013539.0,
[u'Aug 14 11:56:59 i60p295 sshd[12365]: Failed publickey for roehl from 172.31.0.34 port 51332 ssh2',
u'Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:172.31.0.34 port 51332 ssh2'])
# Actually no exception would be raised -- it will be just set to 'no'
#self.assertRaises(ValueError,
@ -1067,9 +1073,11 @@ class DNSUtilsTests(unittest.TestCase):
res = DNSUtils.textToIp('www.example.com', 'no')
self.assertEqual(res, [])
res = DNSUtils.textToIp('www.example.com', 'warn')
self.assertEqual(res, ['93.184.216.34'])
self.assertIn('93.184.216.34', res)
self.assertIn('2606:2800:220:1:248:1893:25c8:1946', res)
res = DNSUtils.textToIp('www.example.com', 'yes')
self.assertEqual(res, ['93.184.216.34'])
self.assertIn('93.184.216.34', res)
self.assertIn('2606:2800:220:1:248:1893:25c8:1946', res)
def testTextToIp(self):
# Test hostnames
@ -1081,32 +1089,31 @@ class DNSUtilsTests(unittest.TestCase):
for s in hostnames:
res = DNSUtils.textToIp(s, 'yes')
if s == 'www.example.com':
self.assertEqual(res, ['93.184.216.34'])
self.assertIn('93.184.216.34', res)
self.assertIn('2606:2800:220:1:248:1893:25c8:1946', res)
else:
self.assertEqual(res, [])
def testIpToName(self):
res = DNSUtils.ipToName('8.8.4.4')
res = DNSUtils.ipToName(IPAddr('8.8.4.4'))
self.assertEqual(res, 'google-public-dns-b.google.com')
res = DNSUtils.ipToName(IPAddr('2001:4860:4860::8844'))
self.assertEqual(res, 'google-public-dns-b.google.com')
# invalid ip (TEST-NET-1 according to RFC 5737)
res = DNSUtils.ipToName('192.0.2.0')
res = DNSUtils.ipToName(IPAddr('192.0.2.0'))
self.assertEqual(res, None)
def testAddr2bin(self):
res = DNSUtils.addr2bin('10.0.0.0')
self.assertEqual(res, 167772160L)
res = DNSUtils.addr2bin('10.0.0.0', cidr=None)
self.assertEqual(res, 167772160L)
res = DNSUtils.addr2bin('10.0.0.0', cidr=32L)
self.assertEqual(res, 167772160L)
res = DNSUtils.addr2bin('10.0.0.1', cidr=32L)
self.assertEqual(res, 167772161L)
res = DNSUtils.addr2bin('10.0.0.1', cidr=31L)
self.assertEqual(res, 167772160L)
def testBin2addr(self):
res = DNSUtils.bin2addr(167772160L)
self.assertEqual(res, '10.0.0.0')
res = IPAddr('10.0.0.0')
self.assertEqual(res.addr, 167772160L)
res = IPAddr('10.0.0.0', cidr=None)
self.assertEqual(res.addr, 167772160L)
res = IPAddr('10.0.0.0', cidr=32L)
self.assertEqual(res.addr, 167772160L)
res = IPAddr('10.0.0.1', cidr=32L)
self.assertEqual(res.addr, 167772161L)
res = IPAddr('10.0.0.1', cidr=31L)
self.assertEqual(res.addr, 167772160L)
class JailTests(unittest.TestCase):

View File

@ -36,6 +36,7 @@ from ..server.failregex import Regex, FailRegex, RegexException
from ..server.server import Server
from ..server.jail import Jail
from ..server.jailthread import JailThread
from ..server.filter import IPAddr
from .utils import LogCaptureTestCase
from ..helpers import getLogger
from .. import version
@ -110,18 +111,23 @@ class TransmitterBase(unittest.TestCase):
cmdAdd = "add" + cmd
cmdDel = "del" + cmd
# sorting IPAddr objects and strings differs so that the comparism values
# must also be sorted as IPAddr objects
# convert to IPAddr objects if values look like a list of IP address strings
ips = map(lambda x: IPAddr(x) if IPAddr.searchIP(x) else x , values)
self.assertEqual(
self.transm.proceed(["get", jail, cmd]), (0, []))
for n, value in enumerate(values):
ret = self.transm.proceed(["set", jail, cmdAdd, value])
self.assertEqual((ret[0], sorted(ret[1])), (0, sorted(values[:n+1])))
self.assertEqual((ret[0], sorted(ret[1])), (0, sorted(ips[:n+1])))
ret = self.transm.proceed(["get", jail, cmd])
self.assertEqual((ret[0], sorted(ret[1])), (0, sorted(values[:n+1])))
for n, value in enumerate(values):
self.assertEqual((ret[0], sorted(ret[1])), (0, sorted(ips[:n+1])))
for n, value in enumerate(ips):
ret = self.transm.proceed(["set", jail, cmdDel, value])
self.assertEqual((ret[0], sorted(ret[1])), (0, sorted(values[n+1:])))
self.assertEqual((ret[0], sorted(ret[1])), (0, sorted(ips[n+1:])))
ret = self.transm.proceed(["get", jail, cmd])
self.assertEqual((ret[0], sorted(ret[1])), (0, sorted(values[n+1:])))
self.assertEqual((ret[0], sorted(ret[1])), (0, sorted(ips[n+1:])))
def jailAddDelRegexTest(self, cmd, inValues, outValues, jail):
cmdAdd = "add" + cmd