remove unnecessary changes due to introduction of iparg decorator

pull/1374/head
Alexander Koeppe 2016-04-08 22:55:32 +02:00
parent caa4521169
commit fd36287354
9 changed files with 75 additions and 67 deletions

View File

@ -42,8 +42,8 @@ from .banmanager import BanManager
from .jailthread import JailThread from .jailthread import JailThread
from .action import ActionBase, CommandAction, CallingMap from .action import ActionBase, CommandAction, CallingMap
from .mytime import MyTime from .mytime import MyTime
from ..ipaddr import IPAddr
from ..helpers import getLogger from ..helpers import getLogger
from ..ipaddr import iparg
# Gets the instance of the logger. # Gets the instance of the logger.
logSys = getLogger(__name__) logSys = getLogger(__name__)
@ -179,7 +179,8 @@ class Actions(JailThread, Mapping):
def getBanTime(self): def getBanTime(self):
return self.__banManager.getBanTime() return self.__banManager.getBanTime()
def removeBannedIP(self, ipstr): @iparg
def removeBannedIP(self, ip):
"""Removes banned IP calling actions' unban method """Removes banned IP calling actions' unban method
Remove a banned IP now, rather than waiting for it to expire, Remove a banned IP now, rather than waiting for it to expire,
@ -187,16 +188,14 @@ class Actions(JailThread, Mapping):
Parameters Parameters
---------- ----------
ipstr : str ip : str
The IP address string to unban The IP address to unban
Raises Raises
------ ------
ValueError ValueError
If `ip` is not banned If `ip` is not banned
""" """
# Create new IPAddr object from IP string
ip = IPAddr(ipstr)
# Always delete ip from database (also if currently not banned) # Always delete ip from database (also if currently not banned)
if self._jail.database is not None: if self._jail.database is not None:
self._jail.database.delBan(self._jail, ip) self._jail.database.delBan(self._jail, ip)

View File

@ -32,7 +32,7 @@ from threading import RLock
from .mytime import MyTime from .mytime import MyTime
from .ticket import FailTicket from .ticket import FailTicket
from ..ipaddr import IPAddr from ..ipaddr import IPAddr, iparg
from ..helpers import getLogger from ..helpers import getLogger
# Gets the instance of the logger. # Gets the instance of the logger.
@ -479,6 +479,7 @@ class Fail2BanDb(object):
tickets[-1].setAttempt(data.get('failures', 1)) tickets[-1].setAttempt(data.get('failures', 1))
return tickets return tickets
@iparg
def getBansMerged(self, ip=None, jail=None, bantime=None): def getBansMerged(self, ip=None, jail=None, bantime=None):
"""Get bans from the database, merged into single ticket. """Get bans from the database, merged into single ticket.

View File

@ -30,6 +30,7 @@ import logging
from .faildata import FailData from .faildata import FailData
from .ticket import FailTicket from .ticket import FailTicket
from ..helpers import getLogger from ..helpers import getLogger
from ..ipaddr import iparg
# Gets the instance of the logger. # Gets the instance of the logger.
logSys = getLogger(__name__) logSys = getLogger(__name__)
@ -136,6 +137,7 @@ class FailManager:
finally: finally:
self.__lock.release() self.__lock.release()
@iparg
def __delFailure(self, ip): def __delFailure(self, ip):
if ip in self.__failList: if ip in self.__failList:
del self.__failList[ip] del self.__failList[ip]

View File

@ -38,7 +38,7 @@ from .mytime import MyTime
from .failregex import FailRegex, Regex, RegexException from .failregex import FailRegex, Regex, RegexException
from .action import CommandAction from .action import CommandAction
from ..helpers import getLogger from ..helpers import getLogger
from ..ipaddr import IPAddr from ..ipaddr import IPAddr, iparg
# Gets the instance of the logger. # Gets the instance of the logger.
logSys = getLogger(__name__) logSys = getLogger(__name__)
@ -307,8 +307,9 @@ class Filter(JailThread):
## ##
# create new IPAddr object from IP address string # create new IPAddr object from IP address string
def newIP(self, ipstr): @iparg
return IPAddr(ipstr) def newIP(self, ip):
return ip
## ##
# Ban an IP - http://blogs.buanzo.com.ar/2009/04/fail2ban-patch-ban-ip-address-manually.html # Ban an IP - http://blogs.buanzo.com.ar/2009/04/fail2ban-patch-ban-ip-address-manually.html
@ -316,8 +317,8 @@ class Filter(JailThread):
# #
# to enable banip fail2ban-client BAN command # to enable banip fail2ban-client BAN command
def addBannedIP(self, ipstr): @iparg
ip = IPAddr(ipstr) def addBannedIP(self, ip):
if self.inIgnoreIPList(ip): if self.inIgnoreIPList(ip):
logSys.warning('Requested to manually ban an ignored IP %s. User knows best. Proceeding to ban it.' % ip) logSys.warning('Requested to manually ban an ignored IP %s. User knows best. Proceeding to ban it.' % ip)
@ -361,10 +362,12 @@ class Filter(JailThread):
logSys.debug("Add " + ip + " to ignore list") logSys.debug("Add " + ip + " to ignore list")
self.__ignoreIpList.append(ip) self.__ignoreIpList.append(ip)
@iparg
def delIgnoreIP(self, ip): def delIgnoreIP(self, ip):
logSys.debug("Remove " + ip + " from ignore list") logSys.debug("Remove " + ip + " from ignore list")
self.__ignoreIpList.remove(ip) self.__ignoreIpList.remove(ip)
@iparg
def logIgnoreIp(self, ip, log_ignore, ignore_source="unknown source"): def logIgnoreIp(self, ip, log_ignore, ignore_source="unknown source"):
if log_ignore: if log_ignore:
logSys.info("[%s] Ignore %s by %s" % (self.jail.name, ip, ignore_source)) logSys.info("[%s] Ignore %s by %s" % (self.jail.name, ip, ignore_source))
@ -380,6 +383,7 @@ class Filter(JailThread):
# @param ip IP address object # @param ip IP address object
# @return True if IP address is in ignore list # @return True if IP address is in ignore list
@iparg
def inIgnoreIPList(self, ip, log_ignore=False): def inIgnoreIPList(self, ip, log_ignore=False):
for net in self.__ignoreIpList: for net in self.__ignoreIpList:
# if it isn't a valid IP address, try DNS resolution # if it isn't a valid IP address, try DNS resolution
@ -881,6 +885,7 @@ class DNSUtils:
return list() return list()
@staticmethod @staticmethod
@iparg
def ipToName(ip): def ipToName(ip):
try: try:
return socket.gethostbyaddr(ip.ntoa())[0] return socket.gethostbyaddr(ip.ntoa())[0]

View File

@ -25,6 +25,7 @@ __copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL" __license__ = "GPL"
from ..helpers import getLogger from ..helpers import getLogger
from ..ipaddr import iparg
# Gets the instance of the logger. # Gets the instance of the logger.
logSys = getLogger(__name__) logSys = getLogger(__name__)
@ -32,6 +33,7 @@ logSys = getLogger(__name__)
class Ticket: class Ticket:
@iparg
def __init__(self, ip, time, matches=None): def __init__(self, ip, time, matches=None):
"""Ticket constructor """Ticket constructor

View File

@ -35,7 +35,7 @@ from ..ipaddr import IPAddr
class AddFailure(unittest.TestCase): class AddFailure(unittest.TestCase):
def setUp(self): def setUp(self):
"""Call before every test case.""" """Call before every test case."""
self.__ticket = BanTicket(IPAddr('193.168.0.128'), 1167605999.0) self.__ticket = BanTicket('193.168.0.128', 1167605999.0)
self.__banManager = BanManager() self.__banManager = BanManager()
self.assertTrue(self.__banManager.addBanTicket(self.__ticket)) self.assertTrue(self.__banManager.addBanTicket(self.__ticket))
@ -51,18 +51,18 @@ class AddFailure(unittest.TestCase):
self.assertEqual(self.__banManager.size(), 1) self.assertEqual(self.__banManager.size(), 1)
def testInListOK(self): def testInListOK(self):
ticket = BanTicket(IPAddr('193.168.0.128'), 1167605999.0) ticket = BanTicket('193.168.0.128', 1167605999.0)
self.assertTrue(self.__banManager._inBanList(ticket)) self.assertTrue(self.__banManager._inBanList(ticket))
def testInListNOK(self): def testInListNOK(self):
ticket = BanTicket(IPAddr('111.111.1.111'), 1167605999.0) ticket = BanTicket('111.111.1.111', 1167605999.0)
self.assertFalse(self.__banManager._inBanList(ticket)) self.assertFalse(self.__banManager._inBanList(ticket))
class StatusExtendedCymruInfo(unittest.TestCase): class StatusExtendedCymruInfo(unittest.TestCase):
def setUp(self): def setUp(self):
"""Call before every test case.""" """Call before every test case."""
self.__ban_ip = IPAddr("93.184.216.34") self.__ban_ip = "93.184.216.34"
self.__asn = "15133" self.__asn = "15133"
self.__country = "EU" self.__country = "EU"
self.__rir = "ripencc" self.__rir = "ripencc"

View File

@ -33,7 +33,6 @@ from ..server.filter import FileContainer
from ..server.mytime import MyTime from ..server.mytime import MyTime
from ..server.ticket import FailTicket from ..server.ticket import FailTicket
from ..server.actions import Actions from ..server.actions import Actions
from ..ipaddr import IPAddr
from .dummyjail import DummyJail from .dummyjail import DummyJail
try: try:
from ..server.database import Fail2BanDb from ..server.database import Fail2BanDb
@ -98,7 +97,7 @@ class DatabaseTest(LogCaptureTestCase):
self.db = Fail2BanDb(self.dbFilename) self.db = Fail2BanDb(self.dbFilename)
self.assertEqual(self.db.getJailNames(), set(['DummyJail #29162448 with 0 tickets'])) self.assertEqual(self.db.getJailNames(), set(['DummyJail #29162448 with 0 tickets']))
self.assertEqual(self.db.getLogPaths(), set(['/tmp/Fail2BanDb_pUlZJh.log'])) self.assertEqual(self.db.getLogPaths(), set(['/tmp/Fail2BanDb_pUlZJh.log']))
ticket = FailTicket(IPAddr("127.0.0.1"), 1388009242.26, [u"abc\n"]) ticket = FailTicket("127.0.0.1", 1388009242.26, [u"abc\n"])
self.assertEqual(self.db.getBans()[0], ticket) self.assertEqual(self.db.getBans()[0], ticket)
self.assertEqual(self.db.updateDb(Fail2BanDb.__version__), Fail2BanDb.__version__) self.assertEqual(self.db.updateDb(Fail2BanDb.__version__), Fail2BanDb.__version__)
@ -172,7 +171,7 @@ class DatabaseTest(LogCaptureTestCase):
if Fail2BanDb is None: # pragma: no cover if Fail2BanDb is None: # pragma: no cover
return return
self.testAddJail() self.testAddJail()
ticket = FailTicket(IPAddr("127.0.0.1"), 0, ["abc\n"]) ticket = FailTicket("127.0.0.1", 0, ["abc\n"])
self.db.addBan(self.jail, ticket) self.db.addBan(self.jail, ticket)
self.assertEqual(len(self.db.getBans(jail=self.jail)), 1) self.assertEqual(len(self.db.getBans(jail=self.jail)), 1)
@ -185,9 +184,9 @@ class DatabaseTest(LogCaptureTestCase):
self.testAddJail() self.testAddJail()
# invalid + valid, invalid + valid unicode, invalid + valid dual converted (like in filter:readline by fallback) ... # invalid + valid, invalid + valid unicode, invalid + valid dual converted (like in filter:readline by fallback) ...
tickets = [ tickets = [
FailTicket(IPAddr("127.0.0.1"), 0, ['user "\xd1\xe2\xe5\xf2\xe0"', 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']), FailTicket("127.0.0.1", 0, ['user "\xd1\xe2\xe5\xf2\xe0"', 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
FailTicket(IPAddr("127.0.0.2"), 0, ['user "\xd1\xe2\xe5\xf2\xe0"', u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']), FailTicket("127.0.0.2", 0, ['user "\xd1\xe2\xe5\xf2\xe0"', u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
FailTicket(IPAddr("127.0.0.3"), 0, ['user "\xd1\xe2\xe5\xf2\xe0"', b'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'.decode('utf-8', 'replace')]) FailTicket("127.0.0.3", 0, ['user "\xd1\xe2\xe5\xf2\xe0"', b'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'.decode('utf-8', 'replace')])
] ]
self.db.addBan(self.jail, tickets[0]) self.db.addBan(self.jail, tickets[0])
self.db.addBan(self.jail, tickets[1]) self.db.addBan(self.jail, tickets[1])
@ -198,15 +197,15 @@ class DatabaseTest(LogCaptureTestCase):
## python 2 or 3 : ## python 2 or 3 :
invstr = u'user "\ufffd\ufffd\ufffd\ufffd\ufffd"'.encode('utf-8', 'replace') invstr = u'user "\ufffd\ufffd\ufffd\ufffd\ufffd"'.encode('utf-8', 'replace')
self.assertTrue( self.assertTrue(
readtickets[0] == FailTicket(IPAddr("127.0.0.1"), 0, [invstr, 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']) readtickets[0] == FailTicket("127.0.0.1", 0, [invstr, 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'])
or readtickets[0] == tickets[0] or readtickets[0] == tickets[0]
) )
self.assertTrue( self.assertTrue(
readtickets[1] == FailTicket(IPAddr("127.0.0.2"), 0, [invstr, u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'.encode('utf-8', 'replace')]) readtickets[1] == FailTicket("127.0.0.2", 0, [invstr, u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'.encode('utf-8', 'replace')])
or readtickets[1] == tickets[1] or readtickets[1] == tickets[1]
) )
self.assertTrue( self.assertTrue(
readtickets[2] == FailTicket(IPAddr("127.0.0.3"), 0, [invstr, 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']) readtickets[2] == FailTicket("127.0.0.3", 0, [invstr, 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'])
or readtickets[2] == tickets[2] or readtickets[2] == tickets[2]
) )
@ -221,9 +220,9 @@ class DatabaseTest(LogCaptureTestCase):
return return
self.testAddJail() self.testAddJail()
self.db.addBan( self.db.addBan(
self.jail, FailTicket(IPAddr("127.0.0.1"), MyTime.time() - 60, ["abc\n"])) self.jail, FailTicket("127.0.0.1", MyTime.time() - 60, ["abc\n"]))
self.db.addBan( self.db.addBan(
self.jail, FailTicket(IPAddr("127.0.0.1"), MyTime.time() - 40, ["abc\n"])) self.jail, FailTicket("127.0.0.1", MyTime.time() - 40, ["abc\n"]))
self.assertEqual(len(self.db.getBans(jail=self.jail,bantime=50)), 1) self.assertEqual(len(self.db.getBans(jail=self.jail,bantime=50)), 1)
self.assertEqual(len(self.db.getBans(jail=self.jail,bantime=20)), 0) self.assertEqual(len(self.db.getBans(jail=self.jail,bantime=20)), 0)
# Negative values are for persistent bans, and such all bans should # Negative values are for persistent bans, and such all bans should
@ -238,27 +237,27 @@ class DatabaseTest(LogCaptureTestCase):
jail2 = DummyJail() jail2 = DummyJail()
self.db.addJail(jail2) self.db.addJail(jail2)
ticket = FailTicket(IPAddr("127.0.0.1"), MyTime.time() - 40, ["abc\n"]) ticket = FailTicket("127.0.0.1", MyTime.time() - 40, ["abc\n"])
ticket.setAttempt(10) ticket.setAttempt(10)
self.db.addBan(self.jail, ticket) self.db.addBan(self.jail, ticket)
ticket = FailTicket(IPAddr("127.0.0.1"), MyTime.time() - 30, ["123\n"]) ticket = FailTicket("127.0.0.1", MyTime.time() - 30, ["123\n"])
ticket.setAttempt(20) ticket.setAttempt(20)
self.db.addBan(self.jail, ticket) self.db.addBan(self.jail, ticket)
ticket = FailTicket(IPAddr("127.0.0.2"), MyTime.time() - 20, ["ABC\n"]) ticket = FailTicket("127.0.0.2", MyTime.time() - 20, ["ABC\n"])
ticket.setAttempt(30) ticket.setAttempt(30)
self.db.addBan(self.jail, ticket) self.db.addBan(self.jail, ticket)
ticket = FailTicket(IPAddr("127.0.0.1"), MyTime.time() - 10, ["ABC\n"]) ticket = FailTicket("127.0.0.1", MyTime.time() - 10, ["ABC\n"])
ticket.setAttempt(40) ticket.setAttempt(40)
self.db.addBan(jail2, ticket) self.db.addBan(jail2, ticket)
# All for IP 127.0.0.1 # All for IP 127.0.0.1
ticket = self.db.getBansMerged(IPAddr("127.0.0.1")) ticket = self.db.getBansMerged("127.0.0.1")
self.assertEqual(ticket.getIP(), "127.0.0.1") self.assertEqual(ticket.getIP(), "127.0.0.1")
self.assertEqual(ticket.getAttempt(), 70) self.assertEqual(ticket.getAttempt(), 70)
self.assertEqual(ticket.getMatches(), ["abc\n", "123\n", "ABC\n"]) self.assertEqual(ticket.getMatches(), ["abc\n", "123\n", "ABC\n"])
# All for IP 127.0.0.1 for single jail # All for IP 127.0.0.1 for single jail
ticket = self.db.getBansMerged(IPAddr("127.0.0.1"), jail=self.jail) ticket = self.db.getBansMerged("127.0.0.1", jail=self.jail)
self.assertEqual(ticket.getIP(), "127.0.0.1") self.assertEqual(ticket.getIP(), "127.0.0.1")
self.assertEqual(ticket.getAttempt(), 30) self.assertEqual(ticket.getAttempt(), 30)
self.assertEqual(ticket.getMatches(), ["abc\n", "123\n"]) self.assertEqual(ticket.getMatches(), ["abc\n", "123\n"])
@ -266,23 +265,23 @@ class DatabaseTest(LogCaptureTestCase):
# Should cache result if no extra bans added # Should cache result if no extra bans added
self.assertEqual( self.assertEqual(
id(ticket), id(ticket),
id(self.db.getBansMerged(IPAddr("127.0.0.1"), jail=self.jail))) id(self.db.getBansMerged("127.0.0.1", jail=self.jail)))
newTicket = FailTicket(IPAddr("127.0.0.2"), MyTime.time() - 20, ["ABC\n"]) newTicket = FailTicket("127.0.0.2", MyTime.time() - 20, ["ABC\n"])
ticket.setAttempt(40) ticket.setAttempt(40)
# Add ticket, but not for same IP, so cache still valid # Add ticket, but not for same IP, so cache still valid
self.db.addBan(self.jail, newTicket) self.db.addBan(self.jail, newTicket)
self.assertEqual( self.assertEqual(
id(ticket), id(ticket),
id(self.db.getBansMerged(IPAddr("127.0.0.1"), jail=self.jail))) id(self.db.getBansMerged("127.0.0.1", jail=self.jail)))
newTicket = FailTicket(IPAddr("127.0.0.1"), MyTime.time() - 10, ["ABC\n"]) newTicket = FailTicket("127.0.0.1", MyTime.time() - 10, ["ABC\n"])
ticket.setAttempt(40) ticket.setAttempt(40)
self.db.addBan(self.jail, newTicket) self.db.addBan(self.jail, newTicket)
# Added ticket, so cache should have been cleared # Added ticket, so cache should have been cleared
self.assertNotEqual( self.assertNotEqual(
id(ticket), id(ticket),
id(self.db.getBansMerged(IPAddr("127.0.0.1"), jail=self.jail))) id(self.db.getBansMerged("127.0.0.1", jail=self.jail)))
tickets = self.db.getBansMerged() tickets = self.db.getBansMerged()
self.assertEqual(len(tickets), 2) self.assertEqual(len(tickets), 2)
@ -313,7 +312,7 @@ class DatabaseTest(LogCaptureTestCase):
"action_checkainfo", "action_checkainfo",
os.path.join(TEST_FILES_DIR, "action.d/action_checkainfo.py"), os.path.join(TEST_FILES_DIR, "action.d/action_checkainfo.py"),
{}) {})
ticket = FailTicket(IPAddr("1.2.3.4"), MyTime.time(), ['test', 'test']) ticket = FailTicket("1.2.3.4", MyTime.time(), ['test', 'test'])
ticket.setAttempt(5) ticket.setAttempt(5)
self.jail.putFailTicket(ticket) self.jail.putFailTicket(ticket)
actions._Actions__checkBan() actions._Actions__checkBan()
@ -340,7 +339,7 @@ class DatabaseTest(LogCaptureTestCase):
# Should leave jail # Should leave jail
self.testAddJail() self.testAddJail()
self.db.addBan( self.db.addBan(
self.jail, FailTicket(IPAddr("127.0.0.1"), MyTime.time(), ["abc\n"])) self.jail, FailTicket("127.0.0.1", MyTime.time(), ["abc\n"]))
self.db.delJail(self.jail) self.db.delJail(self.jail)
self.db.purge() # Should leave jail as ban present self.db.purge() # Should leave jail as ban present
self.assertEqual(len(self.db.getJailNames()), 1) self.assertEqual(len(self.db.getJailNames()), 1)

View File

@ -51,7 +51,7 @@ class AddFailure(unittest.TestCase):
self.__failManager = FailManager() self.__failManager = FailManager()
for i in self.__items: for i in self.__items:
self.__failManager.addFailure(FailTicket(IPAddr(i[0]), i[1])) self.__failManager.addFailure(FailTicket(i[0], i[1]))
def tearDown(self): def tearDown(self):
"""Call after every test case.""" """Call after every test case."""
@ -70,8 +70,8 @@ class AddFailure(unittest.TestCase):
self.__failManager.setMaxTime(600) self.__failManager.setMaxTime(600)
def _testDel(self): def _testDel(self):
self.__failManager.delFailure(IPAddr('193.168.0.128')) self.__failManager.delFailure('193.168.0.128')
self.__failManager.delFailure(IPAddr('111.111.1.111')) self.__failManager.delFailure('111.111.1.111')
self.assertEqual(self.__failManager.size(), 1) self.assertEqual(self.__failManager.size(), 1)

View File

@ -248,7 +248,7 @@ class IgnoreIP(LogCaptureTestCase):
ipList = "127.0.0.1", "192.168.0.1", "255.255.255.255", "99.99.99.99" ipList = "127.0.0.1", "192.168.0.1", "255.255.255.255", "99.99.99.99"
for ip in ipList: for ip in ipList:
self.filter.addIgnoreIP(ip) self.filter.addIgnoreIP(ip)
self.assertTrue(self.filter.inIgnoreIPList(IPAddr(ip))) self.assertTrue(self.filter.inIgnoreIPList(ip))
def testIgnoreIPNOK(self): def testIgnoreIPNOK(self):
ipList = "", "999.999.999.999", "abcdef.abcdef", "192.168.0." ipList = "", "999.999.999.999", "abcdef.abcdef", "192.168.0."
@ -258,21 +258,21 @@ class IgnoreIP(LogCaptureTestCase):
def testIgnoreIPCIDR(self): def testIgnoreIPCIDR(self):
self.filter.addIgnoreIP('192.168.1.0/25') self.filter.addIgnoreIP('192.168.1.0/25')
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.168.1.0'))) self.assertTrue(self.filter.inIgnoreIPList('192.168.1.0'))
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.168.1.1'))) self.assertTrue(self.filter.inIgnoreIPList('192.168.1.1'))
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.168.1.127'))) self.assertTrue(self.filter.inIgnoreIPList('192.168.1.127'))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.168.1.128'))) self.assertFalse(self.filter.inIgnoreIPList('192.168.1.128'))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.168.1.255'))) self.assertFalse(self.filter.inIgnoreIPList('192.168.1.255'))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.168.0.255'))) self.assertFalse(self.filter.inIgnoreIPList('192.168.0.255'))
def testIgnoreIPMask(self): def testIgnoreIPMask(self):
self.filter.addIgnoreIP('192.168.1.0/255.255.255.128') self.filter.addIgnoreIP('192.168.1.0/255.255.255.128')
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.168.1.0'))) self.assertTrue(self.filter.inIgnoreIPList('192.168.1.0'))
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.168.1.1'))) self.assertTrue(self.filter.inIgnoreIPList('192.168.1.1'))
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.168.1.127'))) self.assertTrue(self.filter.inIgnoreIPList('192.168.1.127'))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.168.1.128'))) self.assertFalse(self.filter.inIgnoreIPList('192.168.1.128'))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.168.1.255'))) self.assertFalse(self.filter.inIgnoreIPList('192.168.1.255'))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.168.0.255'))) self.assertFalse(self.filter.inIgnoreIPList('192.168.0.255'))
def testIgnoreInProcessLine(self): def testIgnoreInProcessLine(self):
setUpMyTime() setUpMyTime()
@ -290,17 +290,17 @@ class IgnoreIP(LogCaptureTestCase):
def testIgnoreCommand(self): def testIgnoreCommand(self):
self.filter.setIgnoreCommand(sys.executable + ' ' + os.path.join(TEST_FILES_DIR, "ignorecommand.py <ip>")) self.filter.setIgnoreCommand(sys.executable + ' ' + os.path.join(TEST_FILES_DIR, "ignorecommand.py <ip>"))
self.assertTrue(self.filter.inIgnoreIPList(IPAddr("10.0.0.1"))) self.assertTrue(self.filter.inIgnoreIPList("10.0.0.1"))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr("10.0.0.0"))) self.assertFalse(self.filter.inIgnoreIPList("10.0.0.0"))
def testIgnoreCauseOK(self): def testIgnoreCauseOK(self):
ip = "93.184.216.34" ip = "93.184.216.34"
for ignore_source in ["dns", "ip", "command"]: for ignore_source in ["dns", "ip", "command"]:
self.filter.logIgnoreIp(IPAddr(ip), True, ignore_source=ignore_source) self.filter.logIgnoreIp(ip, True, ignore_source=ignore_source)
self.assertLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, ignore_source)) self.assertLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, ignore_source))
def testIgnoreCauseNOK(self): def testIgnoreCauseNOK(self):
self.filter.logIgnoreIp(IPAddr("example.com"), False, ignore_source="NOT_LOGGED") self.filter.logIgnoreIp("example.com", False, ignore_source="NOT_LOGGED")
self.assertNotLogged("[%s] Ignore %s by %s" % (self.jail.name, "example.com", "NOT_LOGGED")) self.assertNotLogged("[%s] Ignore %s by %s" % (self.jail.name, "example.com", "NOT_LOGGED"))
@ -308,14 +308,14 @@ class IgnoreIPDNS(IgnoreIP):
def testIgnoreIPDNSOK(self): def testIgnoreIPDNSOK(self):
self.filter.addIgnoreIP("www.epfl.ch") self.filter.addIgnoreIP("www.epfl.ch")
self.assertTrue(self.filter.inIgnoreIPList(IPAddr("128.178.50.12"))) self.assertTrue(self.filter.inIgnoreIPList("128.178.50.12"))
def testIgnoreIPDNSNOK(self): def testIgnoreIPDNSNOK(self):
# Test DNS # Test DNS
self.filter.addIgnoreIP("www.epfl.ch") self.filter.addIgnoreIP("www.epfl.ch")
self.assertFalse(self.filter.inIgnoreIPList(IPAddr("127.177.50.10"))) self.assertFalse(self.filter.inIgnoreIPList("127.177.50.10"))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr("128.178.50.11"))) self.assertFalse(self.filter.inIgnoreIPList("128.178.50.11"))
self.assertFalse(self.filter.inIgnoreIPList(IPAddr("128.178.50.13"))) self.assertFalse(self.filter.inIgnoreIPList("128.178.50.13"))
class LogFile(LogCaptureTestCase): class LogFile(LogCaptureTestCase):
@ -1096,12 +1096,12 @@ class DNSUtilsTests(unittest.TestCase):
self.assertEqual(res, []) self.assertEqual(res, [])
def testIpToName(self): def testIpToName(self):
res = DNSUtils.ipToName(IPAddr('8.8.4.4')) res = DNSUtils.ipToName('8.8.4.4')
self.assertEqual(res, 'google-public-dns-b.google.com') self.assertEqual(res, 'google-public-dns-b.google.com')
res = DNSUtils.ipToName(IPAddr('2001:4860:4860::8844')) res = DNSUtils.ipToName('2001:4860:4860::8844')
self.assertEqual(res, 'google-public-dns-b.google.com') self.assertEqual(res, 'google-public-dns-b.google.com')
# invalid ip (TEST-NET-1 according to RFC 5737) # invalid ip (TEST-NET-1 according to RFC 5737)
res = DNSUtils.ipToName(IPAddr('192.0.2.0')) res = DNSUtils.ipToName('192.0.2.0')
self.assertEqual(res, None) self.assertEqual(res, None)
def testAddr2bin(self): def testAddr2bin(self):