mirror of https://github.com/fail2ban/fail2ban
ENH: added a unittest demonstrating the arithm progression sum effect
See https://github.com/fail2ban/fail2ban/issues/17 for more information To run that only test (to fail) you can use nose: nosetests -s -v testcases/filtertestcase.py:FileFilterTests.testRecycledLog_tent/cycle-log
parent
a7d47e8b36
commit
faf5b29ffe
|
@ -231,3 +231,66 @@ class DNSUtilsTests(unittest.TestCase):
|
|||
for s in bogus:
|
||||
res = DNSUtils.textToIp(s)
|
||||
self.assertEqual(res, [])
|
||||
|
||||
|
||||
import os, tempfile
|
||||
|
||||
class FileFilterTests(unittest.TestCase):
|
||||
|
||||
def testRecycledLog(self):
|
||||
|
||||
class RecycledLog:
|
||||
"""Helper class to immitate few kinds of logs
|
||||
|
||||
physical file gets removed in __del__
|
||||
"""
|
||||
def __init__(self, filename, n=None):
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
filename : str
|
||||
Name of the file
|
||||
n : int or None
|
||||
if None, no recycling happning so it 'looks' like a
|
||||
regular log which just appends new lines (although
|
||||
technically different since it gets rewritten
|
||||
entirely upon each new entry)
|
||||
"""
|
||||
self.filename = filename
|
||||
self.n = n
|
||||
self._entries = []
|
||||
# initiate the file so it is not empty
|
||||
open(self.filename, 'w').close()
|
||||
|
||||
def __del__(self):
|
||||
os.remove(self.filename)
|
||||
|
||||
def write(self, s):
|
||||
self._entries += [s]
|
||||
if self.n:
|
||||
# and now 'shrink' it if necessary
|
||||
self._entries = self._entries[max(0, len(self._entries)-self.n):]
|
||||
# rewrite the file with "new" entries
|
||||
open(self.filename, 'w').write('\n'.join(self._entries)+'\n')
|
||||
|
||||
#logfile = '/tmp/1.dat'
|
||||
_, logfile = tempfile.mkstemp('-f2b-test')
|
||||
for n in (0, 3): # shrink file and don't
|
||||
log = RecycledLog(logfile, n)
|
||||
|
||||
ff = FilterPoll(None)
|
||||
ff.addLogPath(logfile)
|
||||
ff.setActive(True)
|
||||
ff.setFindTime(3600*24*365*20) # long enough to run it simply via nosetests
|
||||
# ATM without adjusting MyTime
|
||||
ff.setMaxRetry(2) # fail on having 2
|
||||
ff.addFailRegex("FAILED <HOST>$")
|
||||
for i in range(10):
|
||||
log.write("Aug 14 11:59:59 FAILED 192.0.43.%d" % i)
|
||||
ff.getFailures(logfile)
|
||||
ff_status = ff.status()
|
||||
assert(ff_status[0][0] == 'Currently failed')
|
||||
assert(ff_status[1][0] == 'Total failed')
|
||||
self.assertEqual(ff_status[0][1], ff_status[1][1])
|
||||
#print ff_status
|
||||
del log
|
||||
|
|
Loading…
Reference in New Issue