From 398cc73d3daa0a283388b29c44a70107d3b4adbd Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Sat, 30 Jun 2012 00:35:08 -0400 Subject: [PATCH] Added few tests of FileFilter. yet to place them into a Jail-ed execution test At the moment they are, despite being provided different backends, pretty much test FileFilter functionality. --- testcases/filtertestcase.py | 296 ++++++++++++++++++++++++------------ 1 file changed, 196 insertions(+), 100 deletions(-) diff --git a/testcases/filtertestcase.py b/testcases/filtertestcase.py index 67f61ed2..635e5acc 100644 --- a/testcases/filtertestcase.py +++ b/testcases/filtertestcase.py @@ -32,11 +32,82 @@ from server.filter import FileFilter, DNSUtils from server.failmanager import FailManager from server.failmanager import FailManagerEmpty +# +# Useful helpers +# + +def _killfile(f, name): + try: + f.close() + except: + pass + try: + os.unlink(name) + except: + pass + +def _assert_equal_entries(utest, found, output): + """Little helper to unify comparisons with the target entries + + and report helpful failure reports instead of millions of seconds ;) + """ + utest.assertEqual(found[:2], output[:2]) + found_time, output_time = \ + time.localtime(found[2]),\ + time.localtime(output[2]) + utest.assertEqual(found_time, output_time) + if len(output) > 3: # match matches + utest.assertEqual(repr(found[3]), repr(output[3])) + +def _assert_correct_last_attempt(utest, filter_, output): + """Additional helper to wrap most common test case + + Test filter to contain target ticket + """ + ticket = filter_.failManager.toBan() + + attempts = ticket.getAttempt() + date = ticket.getTime() + ip = ticket.getIP() + matches = ticket.getMatches() + found = (ip, attempts, date, matches) + + _assert_equal_entries(utest, found, output) + +def _copy_lines_between_files(fin, fout, n=None, skip=0, mode='a', terminal_line=""): + """Copy lines from one file to another (which might be already open) + + Returns open fout + """ + if isinstance(fin, str): + fin = open(fin, 'r') + if isinstance(fout, str): + fout = open(fout, mode) + # Skip + for i in xrange(skip): + _ = fin.readline() + # Read/Write + i = 0 + while n is None or i < n: + l = fin.readline() + if terminal_line is not None and l == terminal_line: + break + fout.write(l) + fout.flush() + i += 1 + # to give other threads possibly some time to crunch + time.sleep(0.1) + return fout + +# +# Actual tests +# + class IgnoreIP(unittest.TestCase): def setUp(self): """Call before every test case.""" - self.__filter = FileFilter(None) + self.filter = FileFilter(None) def tearDown(self): """Call after every test case.""" @@ -44,22 +115,22 @@ class IgnoreIP(unittest.TestCase): def testIgnoreIPOK(self): ipList = "127.0.0.1", "192.168.0.1", "255.255.255.255", "99.99.99.99" for ip in ipList: - self.__filter.addIgnoreIP(ip) + self.filter.addIgnoreIP(ip) - self.assertTrue(self.__filter.inIgnoreIPList(ip)) + self.assertTrue(self.filter.inIgnoreIPList(ip)) # Test DNS - self.__filter.addIgnoreIP("www.epfl.ch") + self.filter.addIgnoreIP("www.epfl.ch") - self.assertTrue(self.__filter.inIgnoreIPList("128.178.50.12")) + self.assertTrue(self.filter.inIgnoreIPList("128.178.50.12")) def testIgnoreIPNOK(self): ipList = "", "999.999.999.999", "abcdef", "192.168.0." for ip in ipList: - self.__filter.addIgnoreIP(ip) - self.assertFalse(self.__filter.inIgnoreIPList(ip)) + self.filter.addIgnoreIP(ip) + self.assertFalse(self.filter.inIgnoreIPList(ip)) # Test DNS - self.__filter.addIgnoreIP("www.epfl.ch") - self.assertFalse(self.__filter.inIgnoreIPList("127.177.50.10")) + self.filter.addIgnoreIP("www.epfl.ch") + self.assertFalse(self.filter.inIgnoreIPList("127.177.50.10")) class LogFile(unittest.TestCase): @@ -68,29 +139,20 @@ class LogFile(unittest.TestCase): def setUp(self): """Call before every test case.""" - self.__filter = FilterPoll(None) - self.__filter.addLogPath(LogFile.FILENAME) + self.filter = FilterPoll(None) + self.filter.addLogPath(LogFile.FILENAME) def tearDown(self): """Call after every test case.""" pass #def testOpen(self): - # self.__filter.openLogFile(LogFile.FILENAME) + # self.filter.openLogFile(LogFile.FILENAME) def testIsModified(self): - self.assertTrue(self.__filter.isModified(LogFile.FILENAME)) + self.assertTrue(self.filter.isModified(LogFile.FILENAME)) -def _killfile(f, name): - try: - f.close() - except: - pass - try: - os.unlink(name) - except: - pass def get_monitor_failures_testcase(Filter_): """Generator of TestCase's for different filters @@ -104,18 +166,33 @@ def get_monitor_failures_testcase(Filter_): self.file = open(self.name, 'a') self.filter = Filter_(None) self.filter.addLogPath(self.name) + self.filter.setActive(True) + self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) ") def tearDown(self): _killfile(self.file, self.name) + pass def __str__(self): - return "MonitorFailures(%s, %s)" % (self.filter, self.name) - - def isModified(self): - return self.filter.isModified(self.name) - - def testNewChangeViaIsModified(self): + return "MonitorFailures%s(%s)" \ + % (Filter_, hasattr(self, 'name') and self.name or 'tempfile') + + def isModified(self, delay=2.): + """Wait up to `delay` sec to assure that it was modified or not + """ + time0 = time.time() + while time.time() < time0 + delay: + if self.filter.isModified(self.name): + return True + time.sleep(0.1) + return False + + def notModified(self): + # shorter wait time for not modified status + return not self.isModified(0.4) + + def _testNewChangeViaIsModified(self): if not hasattr(self.filter, 'isModified'): raise unittest.SkipTest( "%s does not have isModified (present only in poll atm" @@ -123,33 +200,78 @@ def get_monitor_failures_testcase(Filter_): # it is a brand new one -- so first we think it is modified self.assertTrue(self.isModified()) # but not any longer - self.assertFalse(self.isModified()) - self.assertFalse(self.isModified()) + self.assertTrue(self.notModified()) + self.assertTrue(self.notModified()) for i in range(4): # few changes # unless we write into it self.file.write("line%d\n" % i) self.file.flush() - time.sleep(0.1) # just give it some time to pull may be self.assertTrue(self.isModified()) - self.assertFalse(self.isModified()) + self.assertTrue(self.notModified()) os.rename(self.name, self.name + '.old') - time.sleep(0.1) # just give it some time to pull may be # we are not signaling as modified whenever # it gets away - self.assertFalse(self.isModified()) + self.assertTrue(self.notModified()) f = open(self.name, 'a') - time.sleep(0.1) # just give it some time to pull may be self.assertTrue(self.isModified()) - self.assertFalse(self.isModified()) + self.assertTrue(self.notModified()) f.write("line%d\n" % i) f.flush() - time.sleep(0.1) # just give it some time to pull may be self.assertTrue(self.isModified()) - self.assertFalse(self.isModified()) + self.assertTrue(self.notModified()) _killfile(f, self.name) _killfile(self.name, self.name + '.old') pass + def testNewChangeViaGetFailures_simple(self): + # suck in lines from this sample log file + self.filter.getFailures(self.name) + self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) + + # Now let's feed it with entries from the file + _copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=5) + self.filter.getFailures(self.name) + self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) + # and it should have not been enough + + _copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=5) + self.filter.getFailures(self.name) + _assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01) + + def testNewChangeViaGetFailures_rewrite(self): + # + # if we rewrite the file at once + self.file.close() + _copy_lines_between_files(GetFailures.FILENAME_01, self.name) + self.filter.getFailures(self.name) + _assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01) + + # What if file gets overriden + # yoh: skip so we skip those 2 identical lines which our + # filter "marked" as the known beginning, otherwise it + # would not detect "rotation" + self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name, + skip=3, mode='w') + self.filter.getFailures(self.name) + #self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) + _assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01) + + def testNewChangeViaGetFailures_move(self): + # + # if we move file into a new location while it has been open already + self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name, + n=14, mode='w') + self.filter.getFailures(self.name) + self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) + self.assertEqual(self.filter.failManager.getFailTotal(), 2) + + # move aside, but leaving the handle still open... + os.rename(self.name, self.name + '.bak') + _copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14) + self.filter.getFailures(self.name) + _assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01) + self.assertEqual(self.filter.failManager.getFailTotal(), 3) + return MonitorFailures @@ -161,54 +283,28 @@ class GetFailures(unittest.TestCase): FILENAME_04 = "testcases/files/testcase04.log" FILENAME_USEDNS = "testcases/files/testcase-usedns.log" + # so that they could be reused by other tests + FAILURES_01 = ('193.168.0.128', 3, 1124013599.0, + ['Aug 14 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128\n']*3) + def setUp(self): """Call before every test case.""" - self.__filter = FileFilter(None) - self.__filter.setActive(True) + self.filter = FileFilter(None) + self.filter.setActive(True) # TODO Test this - #self.__filter.setTimeRegex("\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}") - #self.__filter.setTimePattern("%b %d %H:%M:%S") + #self.filter.setTimeRegex("\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}") + #self.filter.setTimePattern("%b %d %H:%M:%S") def tearDown(self): """Call after every test case.""" - def _assertEqualEntries(self, found, output): - """Little helper to unify comparisons with the target entries - - and report helpful failure reports instead of millions of seconds ;) - """ - self.assertEqual(found[:2], output[:2]) - found_time, output_time = \ - time.localtime(found[2]),\ - time.localtime(output[2]) - self.assertEqual(found_time, output_time) - if len(output) > 3: # match matches - self.assertEqual(repr(found[3]), repr(output[3])) - - def _assertCorrectLastAtempt(self, filter_, output): - """Additional helper to wrap most common test case - - Test filter to contain target ticket - """ - ticket = filter_.failManager.toBan() - - attempts = ticket.getAttempt() - date = ticket.getTime() - ip = ticket.getIP() - matches = ticket.getMatches() - found = (ip, attempts, date, matches) - - self._assertEqualEntries(found, output) def testGetFailures01(self): - output = ('193.168.0.128', 3, 1124013599.0, - ['Aug 14 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128\n']*3) - - self.__filter.addLogPath(GetFailures.FILENAME_01) - self.__filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) ") - self.__filter.getFailures(GetFailures.FILENAME_01) - self._assertCorrectLastAtempt(self.__filter, output) + self.filter.addLogPath(GetFailures.FILENAME_01) + self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) ") + self.filter.getFailures(GetFailures.FILENAME_01) + _assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01) def testGetFailures02(self): @@ -216,30 +312,30 @@ class GetFailures(unittest.TestCase): ['Aug 14 11:%d:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:141.3.81.106 port 51332 ssh2\n' % m for m in 53, 54, 57, 58]) - self.__filter.addLogPath(GetFailures.FILENAME_02) - self.__filter.addFailRegex("Failed .* from ") - self.__filter.getFailures(GetFailures.FILENAME_02) - self._assertCorrectLastAtempt(self.__filter, output) + self.filter.addLogPath(GetFailures.FILENAME_02) + self.filter.addFailRegex("Failed .* from ") + self.filter.getFailures(GetFailures.FILENAME_02) + _assert_correct_last_attempt(self, self.filter, output) def testGetFailures03(self): output = ('203.162.223.135', 6, 1124013544.0) - self.__filter.addLogPath(GetFailures.FILENAME_03) - self.__filter.addFailRegex("error,relay=,.*550 User unknown") - self.__filter.getFailures(GetFailures.FILENAME_03) - self._assertCorrectLastAtempt(self.__filter, output) + self.filter.addLogPath(GetFailures.FILENAME_03) + self.filter.addFailRegex("error,relay=,.*550 User unknown") + self.filter.getFailures(GetFailures.FILENAME_03) + _assert_correct_last_attempt(self, self.filter, output) def testGetFailures04(self): output = [('212.41.96.186', 4, 1124013600.0), ('212.41.96.185', 4, 1124013598.0)] - self.__filter.addLogPath(GetFailures.FILENAME_04) - self.__filter.addFailRegex("Invalid user .* ") - self.__filter.getFailures(GetFailures.FILENAME_04) + self.filter.addLogPath(GetFailures.FILENAME_04) + self.filter.addFailRegex("Invalid user .* ") + self.filter.getFailures(GetFailures.FILENAME_04) try: for i, out in enumerate(output): - self._assertCorrectLastAtempt(self.__filter, out) + _assert_correct_last_attempt(self, self.filter, out) except FailManagerEmpty: pass @@ -266,30 +362,30 @@ class GetFailures(unittest.TestCase): filter_.addLogPath(GetFailures.FILENAME_USEDNS) filter_.addFailRegex("Failed .* from ") filter_.getFailures(GetFailures.FILENAME_USEDNS) - self._assertCorrectLastAtempt(filter_, output) + _assert_correct_last_attempt(self, filter_, output) def testGetFailuresMultiRegex(self): output = ('141.3.81.106', 8, 1124013541.0) - self.__filter.addLogPath(GetFailures.FILENAME_02) - self.__filter.addFailRegex("Failed .* from ") - self.__filter.addFailRegex("Accepted .* from ") - self.__filter.getFailures(GetFailures.FILENAME_02) - self._assertCorrectLastAtempt(self.__filter, output) + self.filter.addLogPath(GetFailures.FILENAME_02) + self.filter.addFailRegex("Failed .* from ") + self.filter.addFailRegex("Accepted .* from ") + self.filter.getFailures(GetFailures.FILENAME_02) + _assert_correct_last_attempt(self, self.filter, output) def testGetFailuresIgnoreRegex(self): output = ('141.3.81.106', 8, 1124013541.0) - self.__filter.addLogPath(GetFailures.FILENAME_02) - self.__filter.addFailRegex("Failed .* from ") - self.__filter.addFailRegex("Accepted .* from ") - self.__filter.addIgnoreRegex("for roehl") + self.filter.addLogPath(GetFailures.FILENAME_02) + self.filter.addFailRegex("Failed .* from ") + self.filter.addFailRegex("Accepted .* from ") + self.filter.addIgnoreRegex("for roehl") - self.__filter.getFailures(GetFailures.FILENAME_02) + self.filter.getFailures(GetFailures.FILENAME_02) - self.assertRaises(FailManagerEmpty, self.__filter.failManager.toBan) + self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) class DNSUtilsTests(unittest.TestCase):