From b82f584a96fd3a19b157fc85c51a72c952809713 Mon Sep 17 00:00:00 2001 From: sebres Date: Thu, 20 Aug 2020 19:33:40 +0200 Subject: [PATCH] added test case covering new date handling (simulation, unknown format, warnings, etc) --- fail2ban/server/filter.py | 9 +++---- fail2ban/tests/filtertestcase.py | 44 +++++++++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/fail2ban/server/filter.py b/fail2ban/server/filter.py index 40bfeef5..3ea0a601 100644 --- a/fail2ban/server/filter.py +++ b/fail2ban/server/filter.py @@ -651,6 +651,7 @@ class Filter(JailThread): else: # in initialization (restore) phase, if too old - ignore: if date is not None and date < MyTime.time() - self.getFindTime(): + print('**********') # log time zone issue as warning once per day: self._logWarnOnce("_next_ignByTimeWarn", ("Ignore line since time %s < %s - %s", @@ -860,12 +861,8 @@ class Filter(JailThread): self._logWarnOnce("_next_noTimeWarn", ("Found a match but no valid date/time found for %r.", tupleLine[1]), ("Match without a timestamp: %s", "\n".join(failRegex.getMatchedLines())), - ("Please try setting a custom " - "date pattern (see man page jail.conf(5)). " - "If format is complex, please " - "file a detailed issue on" - " https://github.com/fail2ban/fail2ban/issues " - "in order to get support for this format.",)) + ("Please try setting a custom date pattern (see man page jail.conf(5)).",) + ) if date is None and self.checkFindTime: continue # we should check all regex (bypass on multi-line, otherwise too complex): if not self.checkAllRegex or self.__lineBufferSize > 1: diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index 63f43b21..2dac91d1 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -394,12 +394,13 @@ class IgnoreIP(LogCaptureTestCase): finally: tearDownMyTime() - def testTimeJump(self): + def _testTimeJump(self, inOperation=False): try: self.filter.addFailRegex('^') self.filter.setDatePattern(r'{^LN-BEG}%Y-%m-%d %H:%M:%S(?:\s*%Z)?\s') self.filter.setFindTime(10); # max 10 seconds back self.filter.setMaxRetry(5); # don't ban here + self.filter.inOperation = inOperation # self.pruneLog('[phase 1] DST time jump') # check local time jump (DST hole): @@ -430,6 +431,47 @@ class IgnoreIP(LogCaptureTestCase): self.assertNotLogged('Ignore line') finally: tearDownMyTime() + def testTimeJump(self): + self._testTimeJump(inOperation=False) + def testTimeJump_InOperation(self): + self._testTimeJump(inOperation=True) + + def testWrongTimeZone(self): + try: + self.filter.addFailRegex('fail from $') + self.filter.setDatePattern(r'{^LN-BEG}%Y-%m-%d %H:%M:%S(?:\s*%Z)?\s') + self.filter.setMaxRetry(5); # don't ban here + self.filter.inOperation = True; # real processing (all messages are new) + # current time is 1h later than log-entries: + MyTime.setTime(1572138000+3600) + # + self.pruneLog("[phase 1] simulate wrong TZ") + for i in (1,2,3): + self.filter.processLineAndAdd('2019-10-27 02:00:00 fail from 192.0.2.15'); # +3 = 3 + self.assertLogged( + "Simulate NOW in operation since found time has too large deviation", + "Please check jail has possibly a timezone issue.", + "192.0.2.15:1", "192.0.2.15:2", "192.0.2.15:3", + "Total # of detected failures: 3.", wait=True) + # + self.pruneLog("[phase 2] wrong TZ given in log") + for i in (1,2,3): + self.filter.processLineAndAdd('2019-10-27 04:00:00 GMT fail from 192.0.2.16'); # +3 = 6 + self.assertLogged( + "192.0.2.16:1", "192.0.2.16:2", "192.0.2.16:3", + "Total # of detected failures: 6.", all=True, wait=True) + self.assertNotLogged("Found a match but no valid date/time found") + # + self.pruneLog("[phase 3] other timestamp (don't match datepattern), regex matches") + for i in range(3): + self.filter.processLineAndAdd('27.10.2019 04:00:00 fail from 192.0.2.17'); # +3 = 9 + self.assertLogged( + "Found a match but no valid date/time found", + "Match without a timestamp:", + "192.0.2.17:1", "192.0.2.17:2", "192.0.2.17:3", + "Total # of detected failures: 9.", all=True, wait=True) + finally: + tearDownMyTime() def testAddAttempt(self): self.filter.setMaxRetry(3)