From faf5b29ffeb9c8e4cd4d9260c4dfab639d81b9ac Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Mon, 2 Jan 2012 17:25:08 -0500 Subject: [PATCH] ENH: added a unittest demonstrating the arithm progression sum effect See https://github.com/fail2ban/fail2ban/issues/17 for more information To run that only test (to fail) you can use nose: nosetests -s -v testcases/filtertestcase.py:FileFilterTests.testRecycledLog --- testcases/filtertestcase.py | 63 +++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/testcases/filtertestcase.py b/testcases/filtertestcase.py index 416423b30..c96a490af 100644 --- a/testcases/filtertestcase.py +++ b/testcases/filtertestcase.py @@ -231,3 +231,66 @@ class DNSUtilsTests(unittest.TestCase): for s in bogus: res = DNSUtils.textToIp(s) self.assertEqual(res, []) + + +import os, tempfile + +class FileFilterTests(unittest.TestCase): + + def testRecycledLog(self): + + class RecycledLog: + """Helper class to immitate few kinds of logs + + physical file gets removed in __del__ + """ + def __init__(self, filename, n=None): + """ + Parameters + ---------- + filename : str + Name of the file + n : int or None + if None, no recycling happning so it 'looks' like a + regular log which just appends new lines (although + technically different since it gets rewritten + entirely upon each new entry) + """ + self.filename = filename + self.n = n + self._entries = [] + # initiate the file so it is not empty + open(self.filename, 'w').close() + + def __del__(self): + os.remove(self.filename) + + def write(self, s): + self._entries += [s] + if self.n: + # and now 'shrink' it if necessary + self._entries = self._entries[max(0, len(self._entries)-self.n):] + # rewrite the file with "new" entries + open(self.filename, 'w').write('\n'.join(self._entries)+'\n') + + #logfile = '/tmp/1.dat' + _, logfile = tempfile.mkstemp('-f2b-test') + for n in (0, 3): # shrink file and don't + log = RecycledLog(logfile, n) + + ff = FilterPoll(None) + ff.addLogPath(logfile) + ff.setActive(True) + ff.setFindTime(3600*24*365*20) # long enough to run it simply via nosetests + # ATM without adjusting MyTime + ff.setMaxRetry(2) # fail on having 2 + ff.addFailRegex("FAILED $") + for i in range(10): + log.write("Aug 14 11:59:59 FAILED 192.0.43.%d" % i) + ff.getFailures(logfile) + ff_status = ff.status() + assert(ff_status[0][0] == 'Currently failed') + assert(ff_status[1][0] == 'Total failed') + self.assertEqual(ff_status[0][1], ff_status[1][1]) + #print ff_status + del log