Merge pull request #2038 from sebres/long-epoch-and-epoch-pattern

Long epoch and epoch pattern
pull/2048/head
Sergey G. Brester 2018-01-31 12:13:46 +01:00 committed by GitHub
commit 0e0e478483
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 93 additions and 13 deletions

View File

@ -26,7 +26,8 @@ import time
from threading import Lock
from .datetemplate import re, DateTemplate, DatePatternRegex, DateTai64n, DateEpoch
from .datetemplate import re, DateTemplate, DatePatternRegex, DateTai64n, DateEpoch, \
RE_EPOCH_PATTERN
from .strptime import validateTimeZone
from .utils import Utils
from ..helpers import getLogger
@ -36,7 +37,7 @@ logSys = getLogger(__name__)
logLevel = 6
RE_DATE_PREMATCH = re.compile("\{DATE\}", re.IGNORECASE)
RE_DATE_PREMATCH = re.compile(r"(?<!\\)\{DATE\}", re.IGNORECASE)
DD_patternCache = Utils.Cache(maxCount=1000, maxTime=60*60)
@ -48,12 +49,18 @@ def _getPatternTemplate(pattern, key=None):
template = DD_patternCache.get(key)
if not template:
if key in ("EPOCH", "{^LN-BEG}EPOCH", "^EPOCH"):
template = DateEpoch(lineBeginOnly=(key != "EPOCH"))
elif key in ("TAI64N", "{^LN-BEG}TAI64N", "^TAI64N"):
template = DateTai64n(wordBegin=('start' if key != "TAI64N" else False))
else:
template = DatePatternRegex(pattern)
if "EPOCH" in key:
if RE_EPOCH_PATTERN.search(pattern):
template = DateEpoch(pattern=pattern, longFrm="LEPOCH" in key)
elif key in ("EPOCH", "{^LN-BEG}EPOCH", "^EPOCH"):
template = DateEpoch(lineBeginOnly=(key != "EPOCH"))
elif key in ("LEPOCH", "{^LN-BEG}LEPOCH", "^LEPOCH"):
template = DateEpoch(lineBeginOnly=(key != "LEPOCH"), longFrm=True)
if template is None:
if key in ("TAI64N", "{^LN-BEG}TAI64N", "^TAI64N"):
template = DateTai64n(wordBegin=('start' if key != "TAI64N" else False))
else:
template = DatePatternRegex(pattern)
DD_patternCache.set(key, template)
return template

View File

@ -47,6 +47,9 @@ RE_LINE_BOUND_END = re.compile(r'(?<![\\\|])(?:\$\)?)$')
RE_ALPHA_PATTERN = re.compile(r'(?<!\%)\%[aAbBpc]')
RE_EPOCH_PATTERN = re.compile(r"(?<!\\)\{L?EPOCH\}", re.IGNORECASE)
class DateTemplate(object):
"""A template which searches for and returns a date from a log line.
@ -192,14 +195,25 @@ class DateEpoch(DateTemplate):
regex
"""
def __init__(self, lineBeginOnly=False):
def __init__(self, lineBeginOnly=False, pattern=None, longFrm=False):
DateTemplate.__init__(self)
self.name = "Epoch"
if not lineBeginOnly:
regex = r"((?:^|(?P<square>(?<=^\[))|(?P<selinux>(?<=\baudit\()))\d{10,11}\b(?:\.\d{3,6})?)(?:(?(selinux)(?=:\d+\)))|(?(square)(?=\])))"
self._longFrm = longFrm;
self._grpIdx = 1
epochRE = r"\d{10,11}\b(?:\.\d{3,6})?"
if longFrm:
self.name = "LongEpoch";
epochRE = r"\d{10,11}(?:\d{3}(?:\.\d{1,6}|\d{3})?)?"
if pattern:
# pattern should capture/cut out the whole match:
regex = "(" + RE_EPOCH_PATTERN.sub(lambda v: "(%s)" % epochRE, pattern) + ")"
self._grpIdx = 2
self.setRegex(regex)
elif not lineBeginOnly:
regex = r"((?:^|(?P<square>(?<=^\[))|(?P<selinux>(?<=\baudit\()))%s)(?:(?(selinux)(?=:\d+\)))|(?(square)(?=\])))" % epochRE
self.setRegex(regex, wordBegin=False) ;# already line begin resp. word begin anchored
else:
regex = r"((?P<square>(?<=^\[))?\d{10,11}\b(?:\.\d{3,6})?)(?(square)(?=\]))"
regex = r"((?P<square>(?<=^\[))?%s)(?(square)(?=\]))" % epochRE
self.setRegex(regex, wordBegin='start', wordEnd=True)
def getDate(self, line, dateMatch=None, default_tz=None):
@ -220,8 +234,14 @@ class DateEpoch(DateTemplate):
if not dateMatch:
dateMatch = self.matchDate(line)
if dateMatch:
v = dateMatch.group(self._grpIdx)
# extract part of format which represents seconds since epoch
return (float(dateMatch.group(1)), dateMatch)
if self._longFrm and len(v) >= 13:
if len(v) >= 16 and '.' not in v:
v = float(v) / 1000000
else:
v = float(v) / 1000
return (float(v), dateMatch)
class DatePatternRegex(DateTemplate):

View File

@ -77,6 +77,48 @@ class DateDetectorTest(LogCaptureTestCase):
log = date + " [sshd] error: PAM: Authentication failure"
datelog = self.datedetector.getTime(log)
self.assertFalse(datelog)
def testGetEpochMsTime(self):
self.__datedetector = DateDetector()
self.__datedetector.appendTemplate('LEPOCH')
# correct short/long epoch time, using all variants:
for fact in (1, 1000, 1000000):
for dateUnix in (1138049999, 32535244799):
for date in ("%s", "[%s]", "[%s]", "audit(%s:101)"):
dateLong = dateUnix * fact
date = date % dateLong
log = date + " [sshd] error: PAM: Authentication failure"
datelog = self.datedetector.getTime(log)
self.assertTrue(datelog, "Parse epoch time for %s failed" % (date,))
( datelog, matchlog ) = datelog
self.assertEqual(int(datelog), dateUnix)
self.assertEqual(matchlog.group(1), str(dateLong))
# wrong, no epoch time (< 10 digits, more as 17 digits, begin/end of word) :
for dateUnix in ('123456789', '999999999999999999', '1138049999A', 'A1138049999'):
for date in ("%s", "[%s]", "[%s.555]", "audit(%s.555:101)"):
date = date % dateUnix
log = date + " [sshd] error: PAM: Authentication failure"
datelog = self.datedetector.getTime(log)
self.assertFalse(datelog)
def testGetEpochPattern(self):
self.__datedetector = DateDetector()
self.__datedetector.appendTemplate('(?<=\|\s){LEPOCH}(?=\s\|)')
# correct short/long epoch time, using all variants:
for fact in (1, 1000, 1000000):
for dateUnix in (1138049999, 32535244799):
dateLong = dateUnix * fact
log = "auth-error | %s | invalid password" % dateLong
datelog = self.datedetector.getTime(log)
self.assertTrue(datelog, "Parse epoch time failed: %r" % (log,))
( datelog, matchlog ) = datelog
self.assertEqual(int(datelog), dateUnix)
self.assertEqual(matchlog.group(1), str(dateLong))
# wrong epoch time format (does not match pattern):
for log in ("test%s123", "test-right | %stest", "test%s | test-left"):
log = log % dateLong
datelog = self.datedetector.getTime(log)
self.assertFalse(datelog)
def testGetTime(self):
log = "Jan 23 21:59:59 [sshd] error: PAM: Authentication failure"

View File

@ -290,6 +290,17 @@ class Fail2banRegexTest(LogCaptureTestCase):
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed')
def testRegexEpochPatterns(self):
(opts, args, fail2banRegex) = _Fail2banRegex(
"-r", "-d", "^\[{LEPOCH}\]\s+", "--maxlines", "5",
"[1516469849] 192.0.2.1 FAIL: failure\n"
"[1516469849551] 192.0.2.2 FAIL: failure\n"
"[1516469849551000] 192.0.2.3 FAIL: failure\n"
"[1516469849551.000] 192.0.2.4 FAIL: failure",
r"^<HOST> FAIL\b"
)
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: 4 lines, 0 ignored, 4 matched, 0 missed')
def testWrongFilterFile(self):
# use test log as filter file to cover eror cases...