mirror of https://github.com/fail2ban/fail2ban
new test cases added (increase coverage);
prepared to merge with upstream/master;pull/716/head
parent
819e4eb540
commit
de0beeff9f
|
@ -42,6 +42,9 @@ class AddFailure(unittest.TestCase):
|
|||
|
||||
def testAdd(self):
|
||||
self.assertEqual(self.__banManager.size(), 1)
|
||||
self.assertEqual(self.__banManager.getBanTotal(), 1)
|
||||
self.__banManager.setBanTotal(0)
|
||||
self.assertEqual(self.__banManager.getBanTotal(), 0)
|
||||
|
||||
def testAddDuplicate(self):
|
||||
self.assertFalse(self.__banManager.addBanTicket(self.__ticket))
|
||||
|
@ -55,3 +58,21 @@ class AddFailure(unittest.TestCase):
|
|||
ticket = BanTicket('111.111.1.111', 1167605999.0)
|
||||
self.assertFalse(self.__banManager._inBanList(ticket))
|
||||
|
||||
def testBanTimeIncr(self):
|
||||
ticket = BanTicket(self.__ticket.getIP(), self.__ticket.getTime())
|
||||
## increase twice and at end permanent:
|
||||
for i in (1000, 2000, -1):
|
||||
self.__banManager.addBanTicket(self.__ticket)
|
||||
ticket.setBanTime(i)
|
||||
self.assertFalse(self.__banManager.addBanTicket(ticket))
|
||||
self.assertEqual(str(self.__banManager.getTicketByIP(ticket.getIP())),
|
||||
"BanTicket: ip=%s time=%s bantime=%s bancount=0 #attempts=0 matches=[]" % (ticket.getIP(), ticket.getTime(), i))
|
||||
## after permanent, it should remain permanent ban time (-1):
|
||||
self.__banManager.addBanTicket(self.__ticket)
|
||||
ticket.setBanTime(-1)
|
||||
self.assertFalse(self.__banManager.addBanTicket(ticket))
|
||||
ticket.setBanTime(1000)
|
||||
self.assertFalse(self.__banManager.addBanTicket(ticket))
|
||||
self.assertEqual(str(self.__banManager.getTicketByIP(ticket.getIP())),
|
||||
"BanTicket: ip=%s time=%s bantime=%s bancount=0 #attempts=0 matches=[]" % (ticket.getIP(), ticket.getTime(), -1))
|
||||
|
||||
|
|
|
@ -27,6 +27,10 @@ from threading import Lock
|
|||
from ..server.jail import Jail
|
||||
from ..server.actions import Actions
|
||||
|
||||
class DummyActions(Actions):
|
||||
def checkBan(self):
|
||||
return self._Actions__checkBan()
|
||||
|
||||
class DummyJail(Jail, object):
|
||||
"""A simple 'jail' to suck in all the tickets generated by Filter's
|
||||
"""
|
||||
|
@ -35,7 +39,7 @@ class DummyJail(Jail, object):
|
|||
self.queue = []
|
||||
super(DummyJail, self).__init__(name='DummyJail', backend=backend)
|
||||
self.__db = None
|
||||
self.__actions = Actions(self)
|
||||
self.__actions = DummyActions(self)
|
||||
|
||||
def __len__(self):
|
||||
try:
|
||||
|
|
|
@ -258,11 +258,22 @@ class BanTimeIncrDB(unittest.TestCase):
|
|||
[(banCount, timeOfBan, lastBanTime) for banCount, timeOfBan, lastBanTime in self.db.getBan(ip, '', None, True)],
|
||||
[(2, stime + 15, 20)]
|
||||
)
|
||||
# check other optional parameters of getBan:
|
||||
self.assertEqual(
|
||||
[(banCount, timeOfBan, lastBanTime) for banCount, timeOfBan, lastBanTime in self.db.getBan(ip, forbantime=stime, fromtime=stime)],
|
||||
[(2, stime + 15, 20)]
|
||||
)
|
||||
# search currently banned and 1 day later (nothing should be found):
|
||||
self.assertEqual(
|
||||
self.db.getCurrentBans(forbantime=-24*60*60, fromtime=stime),
|
||||
[]
|
||||
)
|
||||
# search currently banned one ticket for ip:
|
||||
restored_tickets = self.db.getCurrentBans(ip=ip)
|
||||
self.assertEqual(
|
||||
str(restored_tickets),
|
||||
('FailTicket: ip=%s time=%s bantime=20 bancount=2 #attempts=0 matches=[]' % (ip, stime + 15))
|
||||
)
|
||||
# search currently banned anywhere:
|
||||
restored_tickets = self.db.getCurrentBans(fromtime=stime)
|
||||
self.assertEqual(
|
||||
|
@ -482,7 +493,6 @@ class BanTimeIncrDB(unittest.TestCase):
|
|||
obs.add('failureFound', failManager, self.jail, ticket)
|
||||
obs.wait_empty(5)
|
||||
# wait until ticket transfered from failmanager into jail:
|
||||
i = 50
|
||||
while True:
|
||||
ticket2 = jail.getFailTicket()
|
||||
if ticket2:
|
||||
|
@ -505,6 +515,39 @@ class BanTimeIncrDB(unittest.TestCase):
|
|||
self.assertEqual(restored_tickets[0].getBanTime(), 160)
|
||||
self.assertEqual(restored_tickets[0].getBanCount(), 5)
|
||||
|
||||
# now using jail/actions:
|
||||
ticket = FailTicket(ip, stime-60, ['test-expired-ban-time'])
|
||||
jail.putFailTicket(ticket)
|
||||
self.assertFalse(jail.actions.checkBan())
|
||||
|
||||
ticket = FailTicket(ip, MyTime.time(), ['test-actions'])
|
||||
jail.putFailTicket(ticket)
|
||||
self.assertTrue(jail.actions.checkBan())
|
||||
|
||||
obs.wait_empty(5)
|
||||
restored_tickets = self.db.getCurrentBans(jail=jail, fromtime=stime)
|
||||
self.assertEqual(len(restored_tickets), 1)
|
||||
self.assertEqual(restored_tickets[0].getBanTime(), 320)
|
||||
self.assertEqual(restored_tickets[0].getBanCount(), 6)
|
||||
|
||||
# and permanent:
|
||||
ticket = FailTicket(ip+'1', MyTime.time(), ['test-permanent'])
|
||||
ticket.setBanTime(-1)
|
||||
jail.putFailTicket(ticket)
|
||||
self.assertTrue(jail.actions.checkBan())
|
||||
|
||||
obs.wait_empty(5)
|
||||
ticket = FailTicket(ip+'1', MyTime.time(), ['test-permanent'])
|
||||
ticket.setBanTime(600)
|
||||
jail.putFailTicket(ticket)
|
||||
self.assertFalse(jail.actions.checkBan())
|
||||
|
||||
obs.wait_empty(5)
|
||||
restored_tickets = self.db.getCurrentBans(jail=jail, fromtime=stime)
|
||||
self.assertEqual(len(restored_tickets), 2)
|
||||
self.assertEqual(restored_tickets[1].getBanTime(), -1)
|
||||
self.assertEqual(restored_tickets[1].getBanCount(), 1)
|
||||
|
||||
# stop observer
|
||||
obs.stop()
|
||||
|
||||
|
|
|
@ -763,6 +763,14 @@ class TransmitterLogging(TransmitterBase):
|
|||
self.assertEqual(self.transm.proceed(["set", "logtarget", "STDERR"]), (0, "STDERR"))
|
||||
self.assertEqual(self.transm.proceed(["flushlogs"]), (0, "flushed"))
|
||||
|
||||
def testBanTimeIncr(self):
|
||||
self.setGetTest("bantime.increment", "true", True, jail=self.jailName)
|
||||
self.setGetTest("bantime.rndtime", "30min", 30*60, jail=self.jailName)
|
||||
self.setGetTest("bantime.maxtime", "1000 days", 1000*24*60*60, jail=self.jailName)
|
||||
self.setGetTest("bantime.factor", "2", "2", jail=self.jailName)
|
||||
self.setGetTest("bantime.formula", "ban.Time * math.exp(float(ban.Count+1)*banFactor)/math.exp(1*banFactor)", jail=self.jailName)
|
||||
self.setGetTest("bantime.multipliers", "1 5 30 60 300 720 1440 2880", "1 5 30 60 300 720 1440 2880", jail=self.jailName)
|
||||
self.setGetTest("bantime.overalljails", "true", "true", jail=self.jailName)
|
||||
|
||||
class JailTests(unittest.TestCase):
|
||||
|
||||
|
|
Loading…
Reference in New Issue