diff --git a/fail2ban/server/datedetector.py b/fail2ban/server/datedetector.py index 42df308e..13d70699 100644 --- a/fail2ban/server/datedetector.py +++ b/fail2ban/server/datedetector.py @@ -26,7 +26,8 @@ import time from threading import Lock -from .datetemplate import re, DateTemplate, DatePatternRegex, DateTai64n, DateEpoch +from .datetemplate import re, DateTemplate, DatePatternRegex, DateTai64n, DateEpoch, \ + RE_EPOCH_PATTERN from .strptime import validateTimeZone from .utils import Utils from ..helpers import getLogger @@ -36,7 +37,7 @@ logSys = getLogger(__name__) logLevel = 6 -RE_DATE_PREMATCH = re.compile("\{DATE\}", re.IGNORECASE) +RE_DATE_PREMATCH = re.compile(r"(?(?<=^\[))|(?P(?<=\baudit\()))\d{10,11}\b(?:\.\d{3,6})?)(?:(?(selinux)(?=:\d+\)))|(?(square)(?=\])))" + self._longFrm = longFrm; + epochRE = r"\d{10,11}\b(?:\.\d{3,6})?" + if longFrm: + self.name = "LongEpoch"; + epochRE = r"\d{10,11}(?:\d{3}(?:\d{3})?)?" + if pattern: + regex = RE_EPOCH_PATTERN.sub("(%s)" % epochRE, pattern) + self.setRegex(regex) + elif not lineBeginOnly: + regex = r"((?:^|(?P(?<=^\[))|(?P(?<=\baudit\()))%s)(?:(?(selinux)(?=:\d+\)))|(?(square)(?=\])))" % epochRE self.setRegex(regex, wordBegin=False) ;# already line begin resp. word begin anchored else: - regex = r"((?P(?<=^\[))?\d{10,11}\b(?:\.\d{3,6})?)(?(square)(?=\]))" + regex = r"((?P(?<=^\[))?%s)(?(square)(?=\]))" % epochRE self.setRegex(regex, wordBegin='start', wordEnd=True) def getDate(self, line, dateMatch=None, default_tz=None): @@ -220,8 +231,14 @@ class DateEpoch(DateTemplate): if not dateMatch: dateMatch = self.matchDate(line) if dateMatch: + v = dateMatch.group(1) # extract part of format which represents seconds since epoch - return (float(dateMatch.group(1)), dateMatch) + if self._longFrm and len(v) >= 13: + if len(v) >= 16: + v = float(v) / 1000000 + else: + v = float(v) / 1000 + return (float(v), dateMatch) class DatePatternRegex(DateTemplate): diff --git a/fail2ban/tests/datedetectortestcase.py b/fail2ban/tests/datedetectortestcase.py index 02facf30..69473c9d 100644 --- a/fail2ban/tests/datedetectortestcase.py +++ b/fail2ban/tests/datedetectortestcase.py @@ -77,6 +77,48 @@ class DateDetectorTest(LogCaptureTestCase): log = date + " [sshd] error: PAM: Authentication failure" datelog = self.datedetector.getTime(log) self.assertFalse(datelog) + + def testGetEpochMsTime(self): + self.__datedetector = DateDetector() + self.__datedetector.appendTemplate('LEPOCH') + # correct short/long epoch time, using all variants: + for fact in (1, 1000, 1000000): + for dateUnix in (1138049999, 32535244799): + for date in ("%s", "[%s]", "[%s]", "audit(%s:101)"): + dateLong = dateUnix * fact + date = date % dateLong + log = date + " [sshd] error: PAM: Authentication failure" + datelog = self.datedetector.getTime(log) + self.assertTrue(datelog, "Parse epoch time for %s failed" % (date,)) + ( datelog, matchlog ) = datelog + self.assertEqual(int(datelog), dateUnix) + self.assertEqual(matchlog.group(1), str(dateLong)) + # wrong, no epoch time (< 10 digits, more as 17 digits, begin/end of word) : + for dateUnix in ('123456789', '999999999999999999', '1138049999A', 'A1138049999'): + for date in ("%s", "[%s]", "[%s.555]", "audit(%s.555:101)"): + date = date % dateUnix + log = date + " [sshd] error: PAM: Authentication failure" + datelog = self.datedetector.getTime(log) + self.assertFalse(datelog) + + def testGetEpochPattern(self): + self.__datedetector = DateDetector() + self.__datedetector.appendTemplate('\|\s{LEPOCH}(?=\s\|)') + # correct short/long epoch time, using all variants: + for fact in (1, 1000, 1000000): + for dateUnix in (1138049999, 32535244799): + dateLong = dateUnix * fact + log = "auth-error | %s | invalid password" % dateLong + datelog = self.datedetector.getTime(log) + self.assertTrue(datelog, "Parse epoch time failed: %r" % (log,)) + ( datelog, matchlog ) = datelog + self.assertEqual(int(datelog), dateUnix) + self.assertEqual(matchlog.group(1), str(dateLong)) + # wrong epoch time format (does not match pattern): + for log in ("test%s123", "test-right | %stest", "test%s | test-left"): + log = log % dateLong + datelog = self.datedetector.getTime(log) + self.assertFalse(datelog) def testGetTime(self): log = "Jan 23 21:59:59 [sshd] error: PAM: Authentication failure"