mirror of https://github.com/fail2ban/fail2ban
test cases extended with memory leakage check
parent
f7cc55103c
commit
a10eb39bbe
|
@ -32,6 +32,7 @@ from ..helpers import getLogger
|
|||
|
||||
# Gets the instance of the logger.
|
||||
logSys = getLogger(__name__)
|
||||
logLevel = logging.DEBUG
|
||||
|
||||
|
||||
class FailManager:
|
||||
|
@ -93,13 +94,13 @@ class FailManager:
|
|||
attempts = fData.getRetry()
|
||||
self.__failTotal += 1
|
||||
|
||||
if logSys.getEffectiveLevel() <= logging.DEBUG:
|
||||
if logSys.getEffectiveLevel() <= logLevel:
|
||||
# yoh: Since composing this list might be somewhat time consuming
|
||||
# in case of having many active failures, it should be ran only
|
||||
# if debug level is "low" enough
|
||||
failures_summary = ', '.join(['%s:%d' % (k, v.getRetry())
|
||||
for k,v in self.__failList.iteritems()])
|
||||
logSys.debug("Total # of detected failures: %d. Current failures from %d IPs (IP:count): %s"
|
||||
logSys.log(logLevel, "Total # of detected failures: %d. Current failures from %d IPs (IP:count): %s"
|
||||
% (self.__failTotal, len(self.__failList), failures_summary))
|
||||
return attempts
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ __license__ = "GPL"
|
|||
|
||||
import unittest
|
||||
|
||||
from ..server import failmanager
|
||||
from ..server.failmanager import FailManager, FailManagerEmpty
|
||||
from ..server.ticket import FailTicket
|
||||
|
||||
|
@ -120,3 +121,107 @@ class AddFailure(unittest.TestCase):
|
|||
ticket = self.__failManager.toBan()
|
||||
self.assertNotEqual(ticket.getIP(), "100.100.10.10")
|
||||
self.assertRaises(FailManagerEmpty, self.__failManager.toBan)
|
||||
|
||||
|
||||
class FailmanagerComplex(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
"""Call before every test case."""
|
||||
super(FailmanagerComplex, self).setUp()
|
||||
self.__failManager = FailManager()
|
||||
# down logging level for all this tests, because of extremely large failure count (several GB on heavydebug)
|
||||
self.__saved_ll = failmanager.logLevel
|
||||
failmanager.logLevel = 3
|
||||
|
||||
def tearDown(self):
|
||||
super(FailmanagerComplex, self).tearDown()
|
||||
# restore level
|
||||
failmanager.logLevel = self.__saved_ll
|
||||
|
||||
@staticmethod
|
||||
def _ip_range(maxips):
|
||||
class _ip(list):
|
||||
def __str__(self):
|
||||
return '.'.join(map(str, self))
|
||||
def __repr__(self):
|
||||
return str(self)
|
||||
def __key__(self):
|
||||
return str(self)
|
||||
def __hash__(self):
|
||||
#return (int)(struct.unpack('I', struct.pack("BBBB",*self))[0])
|
||||
return (int)(self[0] << 24 | self[1] << 16 | self[2] << 8 | self[3])
|
||||
i = 0
|
||||
c = [127,0,0,0]
|
||||
while i < maxips:
|
||||
for n in range(3,0,-1):
|
||||
if c[n] < 255:
|
||||
c[n] += 1
|
||||
break
|
||||
c[n] = 0
|
||||
yield (i, _ip(c))
|
||||
i += 1
|
||||
|
||||
def testCheckIPGenerator(self):
|
||||
for i, ip in self._ip_range(65536 if not unittest.F2B.fast else 1000):
|
||||
if i == 254:
|
||||
self.assertEqual(str(ip), '127.0.0.255')
|
||||
elif i == 255:
|
||||
self.assertEqual(str(ip), '127.0.1.0')
|
||||
elif i == 1000:
|
||||
self.assertEqual(str(ip), '127.0.3.233')
|
||||
elif i == 65534:
|
||||
self.assertEqual(str(ip), '127.0.255.255')
|
||||
elif i == 65535:
|
||||
self.assertEqual(str(ip), '127.1.0.0')
|
||||
|
||||
def testFailuresMemLeak1(self):
|
||||
# use factor (divisor) instead unittest.F2B.SkipIfFast() :
|
||||
modeDiv = 1
|
||||
if unittest.F2B.fast: # pragma: no cover
|
||||
modeDiv = 10
|
||||
self.__failManager.setMaxTime(self.__failManager.getMaxTime() // modeDiv)
|
||||
import gc
|
||||
gc.collect()
|
||||
timestamp = 1167606999.0
|
||||
ticktime = timestamp-(1000 // modeDiv)
|
||||
for i, ip in self._ip_range(30000 // modeDiv):
|
||||
t = FailTicket(ip, ticktime, list({'match': i}))
|
||||
ticktime += 1
|
||||
self.__failManager.addFailure(t)
|
||||
if i % (500 // modeDiv) == 0:
|
||||
self.__failManager.cleanup(timestamp)
|
||||
if i % (888 // modeDiv) == 0:
|
||||
timestamp += (1000 // modeDiv)
|
||||
ticktime = timestamp-(1000 // modeDiv)
|
||||
self.assertFalse(gc.collect())
|
||||
self.__failManager.cleanup(timestamp)
|
||||
ticktime -= timestamp - self.__failManager.getMaxTime() + 1
|
||||
self.assertEqual(self.__failManager.size(), ticktime if ticktime > 0 else 0)
|
||||
self.assertFalse(gc.collect())
|
||||
|
||||
def testFailuresMemLeak2(self):
|
||||
# use factor (divisor) instead unittest.F2B.SkipIfFast() :
|
||||
modeDiv = 1
|
||||
if unittest.F2B.fast: # pragma: no cover
|
||||
modeDiv = 10
|
||||
self.__failManager.setMaxTime(self.__failManager.getMaxTime() // modeDiv)
|
||||
import gc
|
||||
gc.collect()
|
||||
timestamp = 1167606999.0
|
||||
ticktime = timestamp-(1000 // modeDiv)
|
||||
for i, ip in self._ip_range(10000 // modeDiv):
|
||||
for j in range(0, 5):
|
||||
t = FailTicket(ip, ticktime, list({'match': i}))
|
||||
ticktime += 1
|
||||
self.__failManager.addFailure(t)
|
||||
if i % (500 // modeDiv) == 0:
|
||||
self.__failManager.cleanup(timestamp)
|
||||
if i % (888 // modeDiv) == 0 or i % (1500 // modeDiv) == 0:
|
||||
timestamp += (1000 // modeDiv)
|
||||
ticktime = timestamp-(1000 // modeDiv)
|
||||
self.assertFalse(gc.collect())
|
||||
timestamp += 20000
|
||||
self.__failManager.cleanup(timestamp)
|
||||
self.assertEqual(self.__failManager.size(), 0)
|
||||
self.assertFalse(gc.collect())
|
||||
|
||||
|
|
|
@ -176,6 +176,7 @@ def gatherTests(regexps=None, opts=None):
|
|||
tests.addTest(unittest.makeSuite(tickettestcase.TicketTests))
|
||||
# FailManager
|
||||
tests.addTest(unittest.makeSuite(failmanagertestcase.AddFailure))
|
||||
tests.addTest(unittest.makeSuite(failmanagertestcase.FailmanagerComplex))
|
||||
# BanManager
|
||||
tests.addTest(unittest.makeSuite(banmanagertestcase.AddFailure))
|
||||
try:
|
||||
|
|
Loading…
Reference in New Issue