diff --git a/fail2ban-testcases b/fail2ban-testcases index a6e473d8..528659ac 100755 --- a/fail2ban-testcases +++ b/fail2ban-testcases @@ -83,11 +83,6 @@ print "Fail2ban " + version + " test suite. Please wait..." tests = unittest.TestSuite() -# Filter -tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIP)) -tests.addTest(unittest.makeSuite(filtertestcase.LogFile)) -tests.addTest(unittest.makeSuite(filtertestcase.GetFailures)) -tests.addTest(unittest.makeSuite(filtertestcase.DNSUtilsTests)) # Server #tests.addTest(unittest.makeSuite(servertestcase.StartStop)) #tests.addTest(unittest.makeSuite(servertestcase.Transmitter)) @@ -98,6 +93,40 @@ tests.addTest(unittest.makeSuite(failmanagertestcase.AddFailure)) tests.addTest(unittest.makeSuite(banmanagertestcase.AddFailure)) # ClientReader tests.addTest(unittest.makeSuite(clientreadertestcase.JailReaderTest)) + +from server.filterpoll import FilterPoll +filters = [FilterPoll] # always available + +# Additional filters available only if external modules are available +# yoh: Since I do not know better way for parametric tests +# with good old unittest +try: + from server.filtergamin import FilterGamin + #filters.append(FilterGamin) +except: + pass + +try: + from server.filterpyinotify import FilterPyinotify + filters.append(FilterPyinotify) +except: + pass + +for Filter_ in filters: + tests.addTest(unittest.makeSuite( + filtertestcase.get_monitor_failures_testcase(Filter_))) + +# yoh: adding them (in particular datadetectortestscase before above +# get_monitor_failures_testcase's makes them fail (probably due +# to additional thread making it busier or smth like +# that)... TODO + +# Filter +tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIP)) +tests.addTest(unittest.makeSuite(filtertestcase.LogFile)) +tests.addTest(unittest.makeSuite(filtertestcase.GetFailures)) +tests.addTest(unittest.makeSuite(filtertestcase.DNSUtilsTests)) + # DateDetector tests.addTest(unittest.makeSuite(datedetectortestcase.DateDetectorTest)) diff --git a/testcases/filtertestcase.py b/testcases/filtertestcase.py index 5e16f7d3..67f61ed2 100644 --- a/testcases/filtertestcase.py +++ b/testcases/filtertestcase.py @@ -23,7 +23,9 @@ __copyright__ = "Copyright (c) 2004 Cyril Jaquier; 2012 Yaroslav Halchenko" __license__ = "GPL" import unittest +import os import time +import tempfile from server.filterpoll import FilterPoll from server.filter import FileFilter, DNSUtils @@ -43,9 +45,11 @@ class IgnoreIP(unittest.TestCase): ipList = "127.0.0.1", "192.168.0.1", "255.255.255.255", "99.99.99.99" for ip in ipList: self.__filter.addIgnoreIP(ip) + self.assertTrue(self.__filter.inIgnoreIPList(ip)) # Test DNS self.__filter.addIgnoreIP("www.epfl.ch") + self.assertTrue(self.__filter.inIgnoreIPList("128.178.50.12")) def testIgnoreIPNOK(self): @@ -78,6 +82,77 @@ class LogFile(unittest.TestCase): self.assertTrue(self.__filter.isModified(LogFile.FILENAME)) +def _killfile(f, name): + try: + f.close() + except: + pass + try: + os.unlink(name) + except: + pass + +def get_monitor_failures_testcase(Filter_): + """Generator of TestCase's for different filters + """ + + class MonitorFailures(unittest.TestCase): + def setUp(self): + """Call before every test case.""" + self.filter = self.name = 'NA' + _, self.name = tempfile.mkstemp('fail2ban', 'monitorfailures') + self.file = open(self.name, 'a') + self.filter = Filter_(None) + self.filter.addLogPath(self.name) + + + def tearDown(self): + _killfile(self.file, self.name) + + def __str__(self): + return "MonitorFailures(%s, %s)" % (self.filter, self.name) + + def isModified(self): + return self.filter.isModified(self.name) + + def testNewChangeViaIsModified(self): + if not hasattr(self.filter, 'isModified'): + raise unittest.SkipTest( + "%s does not have isModified (present only in poll atm" + % (self.filter,)) + # it is a brand new one -- so first we think it is modified + self.assertTrue(self.isModified()) + # but not any longer + self.assertFalse(self.isModified()) + self.assertFalse(self.isModified()) + for i in range(4): # few changes + # unless we write into it + self.file.write("line%d\n" % i) + self.file.flush() + time.sleep(0.1) # just give it some time to pull may be + self.assertTrue(self.isModified()) + self.assertFalse(self.isModified()) + os.rename(self.name, self.name + '.old') + time.sleep(0.1) # just give it some time to pull may be + # we are not signaling as modified whenever + # it gets away + self.assertFalse(self.isModified()) + f = open(self.name, 'a') + time.sleep(0.1) # just give it some time to pull may be + self.assertTrue(self.isModified()) + self.assertFalse(self.isModified()) + f.write("line%d\n" % i) + f.flush() + time.sleep(0.1) # just give it some time to pull may be + self.assertTrue(self.isModified()) + self.assertFalse(self.isModified()) + _killfile(f, self.name) + _killfile(self.name, self.name + '.old') + pass + + return MonitorFailures + + class GetFailures(unittest.TestCase): FILENAME_01 = "testcases/files/testcase01.log"