ENH: full timezone support

ISO8601 and %z based timezones now fully supported.

Restructured so log lines are also only parsed once and return
a unixtime and a pattern match.

Fix all test cases to adjust for the change in return value.
pull/349/head
Daniel Black 2013-09-15 23:44:30 +10:00
parent 422e2527c4
commit d8f73c0205
6 changed files with 124 additions and 118 deletions

View File

@ -208,10 +208,6 @@ class DateDetector:
finally: finally:
self.__lock.release() self.__lock.release()
def getUnixTime(self, line):
date = self.getTime(line)
return date and time.mktime(date)
## ##
# Sort the template lists using the hits score. This method is not called # Sort the template lists using the hits score. This method is not called
# in this object and thus should be called from time to time. # in this object and thus should be called from time to time.

View File

@ -24,7 +24,10 @@ __author__ = "Cyril Jaquier"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier" __copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL" __license__ = "GPL"
import re, time import re, time, calendar
from datetime import datetime
from datetime import timedelta
from mytime import MyTime from mytime import MyTime
import iso8601 import iso8601
@ -82,12 +85,11 @@ class DateEpoch(DateTemplate):
self.setRegex("^\d{10}(\.\d{6})?") self.setRegex("^\d{10}(\.\d{6})?")
def getDate(self, line): def getDate(self, line):
date = None
dateMatch = self.matchDate(line) dateMatch = self.matchDate(line)
if dateMatch: if dateMatch:
# extract part of format which represents seconds since epoch # extract part of format which represents seconds since epoch
date = list(MyTime.localtime(float(dateMatch.group()))) return (float(dateMatch.group()), dateMatch)
return date return None
## ##
@ -136,7 +138,6 @@ class DateStrptime(DateTemplate):
convertLocale = staticmethod(convertLocale) convertLocale = staticmethod(convertLocale)
def getDate(self, line): def getDate(self, line):
date = None
dateMatch = self.matchDate(line) dateMatch = self.matchDate(line)
if dateMatch: if dateMatch:
@ -151,12 +152,12 @@ class DateStrptime(DateTemplate):
logSys.debug(u"Replacing %%z with %r now %r" % (dateMatch.group('_z'), datePattern)) logSys.debug(u"Replacing %%z with %r now %r" % (dateMatch.group('_z'), datePattern))
try: try:
# Try first with 'C' locale # Try first with 'C' locale
date = list(time.strptime(dateMatch.group(), datePattern)) date = datetime.strptime(dateMatch.group(), datePattern)
except ValueError: except ValueError:
# Try to convert date string to 'C' locale # Try to convert date string to 'C' locale
conv = self.convertLocale(datePattern) conv = self.convertLocale(dateMatch.group())
try: try:
date = list(time.strptime(conv, self.getPattern())) date = datetime.strptime(conv, self.getPattern())
except (ValueError, re.error), e: except (ValueError, re.error), e:
# Try to add the current year to the pattern. Should fix # Try to add the current year to the pattern. Should fix
# the "Feb 29" issue. # the "Feb 29" issue.
@ -165,7 +166,7 @@ class DateStrptime(DateTemplate):
if not '%Y' in opattern: if not '%Y' in opattern:
pattern = "%s %%Y" % opattern pattern = "%s %%Y" % opattern
conv += " %s" % MyTime.gmtime()[0] conv += " %s" % MyTime.gmtime()[0]
date = list(time.strptime(conv, pattern)) date = datetime.strptime(conv, pattern)
else: else:
# we are helpless here # we are helpless here
raise ValueError( raise ValueError(
@ -173,35 +174,48 @@ class DateStrptime(DateTemplate):
"exception was %r and Feb 29 workaround could not " "exception was %r and Feb 29 workaround could not "
"be tested due to already present year mark in the " "be tested due to already present year mark in the "
"pattern" % (opattern, e)) "pattern" % (opattern, e))
if date[0] < 2000:
# There is probably no year field in the logs
# NOTE: Possibly makes week/year day incorrect
date[0] = MyTime.gmtime()[0]
# Bug fix for #1241756
# If the date is greater than the current time, we suppose
# that the log is not from this year but from the year before
if time.mktime(date) > MyTime.time():
logSys.debug(
u"Correcting deduced year from %d to %d since %f > %f" %
(date[0], date[0]-1, time.mktime(date), MyTime.time()))
# NOTE: Possibly makes week/year day incorrect
date[0] -= 1
elif date[1] == 1 and date[2] == 1:
# If it is Jan 1st, it is either really Jan 1st or there
# is neither month nor day in the log.
# NOTE: Possibly makes week/year day incorrect
date[1] = MyTime.gmtime()[1]
date[2] = MyTime.gmtime()[2]
if self.__unsupported_z: if self.__unsupported_z:
z = dateMatch.group('_z') z = dateMatch.group('_z')
if z: if z:
date_sec = time.mktime(date) delta = timedelta(hours=int(z[1:3]),minutes=int(z[3:]))
date_sec -= (int(z[1:3]) * 60 + int(z[3:])) * int(z[0] + '60') direction = z[0]
date = list(time.localtime(date_sec)) logSys.debug(u"Altering %r by removing time zone offset (%s)%s" % (date, direction, delta))
#date[8] = 0 # dst # here we reverse the effect of the timezone and force it to UTC
logSys.debug(u"After working with offset date now %r" % date) if direction == '+':
date -= delta
else:
date += delta
date = date.replace(tzinfo=iso8601.Utc())
else:
logSys.warn("No _z group captured and %%z is not supported on current platform"
" - timezone ignored and assumed to be localtime. date: %s on line: %s"
% (date, line))
return date if date.year < 2000:
# There is probably no year field in the logs
# NOTE: Possibly makes week/year day incorrect
date = date.replace(year=MyTime.gmtime()[0])
# Bug fix for #1241756
# If the date is greater than the current time, we suppose
# that the log is not from this year but from the year before
if date > MyTime.now():
logSys.debug(
u"Correcting deduced year by one since %s > now (%s)" %
(date, MyTime.time()))
date = date.replace(year=date.year-1)
elif date.month == 1 and date.day == 1:
# If it is Jan 1st, it is either really Jan 1st or there
# is neither month nor day in the log.
# NOTE: Possibly makes week/year day incorrect
date = date.replace(month=MyTime.gmtime()[1], day=1)
if date.tzinfo:
return ( calendar.timegm(date.utctimetuple()), dateMatch )
else:
return ( time.mktime(date.utctimetuple()), dateMatch )
return None
try: try:
time.strptime("26-Jul-2007 15:20:52.252","%d-%b-%Y %H:%M:%S.%f") time.strptime("26-Jul-2007 15:20:52.252","%d-%b-%Y %H:%M:%S.%f")
@ -224,15 +238,14 @@ class DateTai64n(DateTemplate):
self.setRegex("@[0-9a-f]{24}", wordBegin=False) self.setRegex("@[0-9a-f]{24}", wordBegin=False)
def getDate(self, line): def getDate(self, line):
date = None
dateMatch = self.matchDate(line) dateMatch = self.matchDate(line)
if dateMatch: if dateMatch:
# extract part of format which represents seconds since epoch # extract part of format which represents seconds since epoch
value = dateMatch.group() value = dateMatch.group()
seconds_since_epoch = value[2:17] seconds_since_epoch = value[2:17]
# convert seconds from HEX into local time stamp # convert seconds from HEX into local time stamp
date = list(MyTime.localtime(int(seconds_since_epoch, 16))) return (int(seconds_since_epoch, 16), dateMatch)
return date return None
class DateISO8601(DateTemplate): class DateISO8601(DateTemplate):
@ -245,11 +258,10 @@ class DateISO8601(DateTemplate):
self.setRegex(date_re) self.setRegex(date_re)
def getDate(self, line): def getDate(self, line):
date = None
dateMatch = self.matchDate(line) dateMatch = self.matchDate(line)
if dateMatch: if dateMatch:
# Parses the date. # Parses the date.
value = dateMatch.group() value = dateMatch.group()
date = list(iso8601.parse_date(value).timetuple()) return (calendar.timegm(iso8601.parse_date(value).utctimetuple()), dateMatch)
return date return None

View File

@ -295,18 +295,8 @@ class Filter(JailThread):
l = l.rstrip('\r\n') l = l.rstrip('\r\n')
logSys.log(7, "Working on line %r", l) logSys.log(7, "Working on line %r", l)
timeMatch = self.dateDetector.matchTime(l)
if timeMatch: return self.findFailure(l, returnRawHost, checkAllRegex)
# Lets split into time part and log part of the line
timeLine = timeMatch.group()
# Lets leave the beginning in as well, so if there is no
# anchore at the beginning of the time regexp, we don't
# at least allow injection. Should be harmless otherwise
logLine = l[:timeMatch.start()] + l[timeMatch.end():]
else:
timeLine = l
logLine = l
return self.findFailure(timeLine, logLine, returnRawHost, checkAllRegex)
def processLineAndAdd(self, line): def processLineAndAdd(self, line):
"""Processes the line for failures and populates failManager """Processes the line for failures and populates failManager
@ -349,16 +339,28 @@ class Filter(JailThread):
# to find the logging time. # to find the logging time.
# @return a dict with IP and timestamp. # @return a dict with IP and timestamp.
def findFailure(self, timeLine, logLine, def findFailure(self, logLine,
returnRawHost=False, checkAllRegex=False): returnRawHost=False, checkAllRegex=False):
logSys.log(5, "Date: %r, message: %r", timeLine, logLine)
failList = list() failList = list()
# Checks if we must ignore this line. # Checks if we must ignore this line.
if self.ignoreLine(logLine) is not None: if self.ignoreLine(logLine) is not None:
# The ignoreregex matched. Return. # The ignoreregex matched. Return.
logSys.log(7, "Matched ignoreregex and was ignored") logSys.log(7, "Matched ignoreregex and was \"%s\" ignored", logLine)
return failList return failList
date = self.dateDetector.getUnixTime(timeLine) dd = self.dateDetector.getTime(logLine)
if dd is None:
return failList
date = dd[0]
timeMatch = dd[1]
if timeMatch:
# Lets split into time part and log part of the line
timeLine = timeMatch.group()
# Lets leave the beginning in as well, so if there is no
# anchore at the beginning of the time regexp, we don't
# at least allow injection. Should be harmless otherwise
logLine = logLine[:timeMatch.start()] + logLine[timeMatch.end():]
# Iterates over all the regular expressions. # Iterates over all the regular expressions.
for failRegexIndex, failRegex in enumerate(self.__failRegex): for failRegexIndex, failRegex in enumerate(self.__failRegex):
failRegex.search(logLine) failRegex.search(logLine)

View File

@ -24,9 +24,10 @@ __author__ = "Cyril Jaquier"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier" __copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL" __license__ = "GPL"
import unittest, calendar, datetime, re, pprint import unittest, calendar, time, datetime, re, pprint
from server.datedetector import DateDetector from server.datedetector import DateDetector
from server.datetemplate import DateTemplate from server.datetemplate import DateTemplate
from server.iso8601 import Utc
class DateDetectorTest(unittest.TestCase): class DateDetectorTest(unittest.TestCase):
@ -40,11 +41,12 @@ class DateDetectorTest(unittest.TestCase):
def testGetEpochTime(self): def testGetEpochTime(self):
log = "1138049999 [sshd] error: PAM: Authentication failure" log = "1138049999 [sshd] error: PAM: Authentication failure"
date = [2006, 1, 23, 21, 59, 59, 0, 23, 0] #date = [2006, 1, 23, 21, 59, 59, 0, 23, 0]
dateUnix = 1138049999.0 dateUnix = 1138049999.0
self.assertEqual(self.__datedetector.getTime(log), date) ( datelog, matchlog ) = self.__datedetector.getTime(log)
self.assertEqual(self.__datedetector.getUnixTime(log), dateUnix) self.assertEqual(datelog, dateUnix)
self.assertEqual(matchlog.group(), '1138049999')
def testGetTime(self): def testGetTime(self):
log = "Jan 23 21:59:59 [sshd] error: PAM: Authentication failure" log = "Jan 23 21:59:59 [sshd] error: PAM: Authentication failure"
@ -54,8 +56,9 @@ class DateDetectorTest(unittest.TestCase):
# is not correctly determined atm, since year is not present # is not correctly determined atm, since year is not present
# in the log entry. Since this doesn't effect the operation # in the log entry. Since this doesn't effect the operation
# of fail2ban -- we just ignore incorrect day of the week # of fail2ban -- we just ignore incorrect day of the week
self.assertEqual(self.__datedetector.getTime(log)[:6], date[:6]) ( datelog, matchlog ) = self.__datedetector.getTime(log)
self.assertEqual(self.__datedetector.getUnixTime(log), dateUnix) self.assertEqual(datelog, dateUnix)
self.assertEqual(matchlog.group(), 'Jan 23 21:59:59')
def testVariousTimes(self): def testVariousTimes(self):
"""Test detection of various common date/time formats f2b should understand """Test detection of various common date/time formats f2b should understand
@ -72,16 +75,16 @@ class DateDetectorTest(unittest.TestCase):
"2005.01.23 21:59:59", "2005.01.23 21:59:59",
"23/01/2005 21:59:59", "23/01/2005 21:59:59",
"23/01/05 21:59:59", "23/01/05 21:59:59",
"23/Jan/2005:22:59:59 +0100", "23/Jan/2005:21:59:59 +0100",
"01/23/2005:21:59:59", "01/23/2005:21:59:59",
"2005-01-23 21:59:59", "2005-01-23 21:59:59",
"23-Jan-2005 21:59:59.02", "23-Jan-2005 21:59:59.02",
"23-Jan-2005 22:59:59 +0100", "23-Jan-2005 21:59:59 +0100",
"23-01-2005 21:59:59", "23-01-2005 21:59:59",
"01-23-2005 21:59:59.252", # reported on f2b, causes Feb29 fix to break "01-23-2005 21:59:59.252", # reported on f2b, causes Feb29 fix to break
"@4000000041f4104f00000000", # TAI64N "@4000000041f4104f00000000", # TAI64N
"2005-01-23T21:59:59.252Z", #ISO 8601 "2005-01-23T20:59:59.252Z", #ISO 8601
"2005-01-23T21:59:59-05:00Z", #ISO 8601 with TZ "2005-01-23T15:59:59-05:00", #ISO 8601 with TZ
"<01/23/05@21:59:59>", "<01/23/05@21:59:59>",
"050123 21:59:59", # MySQL "050123 21:59:59", # MySQL
"Jan-23-05 21:59:59", # ASSP like "Jan-23-05 21:59:59", # ASSP like
@ -92,8 +95,9 @@ class DateDetectorTest(unittest.TestCase):
# yoh: on [:6] see in above test # yoh: on [:6] see in above test
logtime = self.__datedetector.getTime(log) logtime = self.__datedetector.getTime(log)
self.assertNotEqual(logtime, None, "getTime retrieved nothing: failure for %s" % sdate) self.assertNotEqual(logtime, None, "getTime retrieved nothing: failure for %s" % sdate)
self.assertEqual(logtime[:6], date[:6], "getTime comparison failure for %s: \"%s\" is not \"%s\"" % (sdate, logtime[:6], date[:6])) ( logUnix, logMatch ) = logtime
self.assertEqual(self.__datedetector.getUnixTime(log), dateUnix, "getUnixTime failure for %s: \"%s\" is not \"%s\"" % (sdate, logtime[:6], date[:6])) self.assertEqual(logUnix, dateUnix, "getTime comparison failure for %s: \"%s\" is not \"%s\"" % (sdate, logUnix, dateUnix))
self.assertEqual(logMatch.group(), sdate)
def testStableSortTemplate(self): def testStableSortTemplate(self):
old_names = [x.getName() for x in self.__datedetector.getTemplates()] old_names = [x.getName() for x in self.__datedetector.getTemplates()]
@ -110,44 +114,29 @@ class DateDetectorTest(unittest.TestCase):
# see https://github.com/fail2ban/fail2ban/pull/130 # see https://github.com/fail2ban/fail2ban/pull/130
# yoh: unfortunately this test is not really effective to reproduce the # yoh: unfortunately this test is not really effective to reproduce the
# situation but left in place to assure consistent behavior # situation but left in place to assure consistent behavior
m1 = [2012, 10, 11, 2, 37, 17] mu = time.mktime(datetime.datetime(2012, 10, 11, 2, 37, 17).utctimetuple())
self.assertEqual( logdate = self.__datedetector.getTime('2012/10/11 02:37:17 [error] 18434#0')
self.__datedetector.getTime('2012/10/11 02:37:17 [error] 18434#0')[:6], self.assertNotEqual(logdate, None)
m1) ( logTime, logMatch ) = logdate
self.assertEqual(logTime, mu)
self.assertEqual(logMatch.group(), '2012/10/11 02:37:17')
self.__datedetector.sortTemplate() self.__datedetector.sortTemplate()
# confuse it with year being at the end # confuse it with year being at the end
for i in xrange(10): for i in xrange(10):
self.assertEqual( ( logTime, logMatch ) = self.__datedetector.getTime('11/10/2012 02:37:17 [error] 18434#0')
self.__datedetector.getTime('11/10/2012 02:37:17 [error] 18434#0')[:6], self.assertEqual(logTime, mu)
m1) self.assertEqual(logMatch.group(), '11/10/2012 02:37:17')
self.__datedetector.sortTemplate() self.__datedetector.sortTemplate()
# and now back to the original # and now back to the original
self.assertEqual( ( logTime, logMatch ) = self.__datedetector.getTime('2012/10/11 02:37:17 [error] 18434#0')
self.__datedetector.getTime('2012/10/11 02:37:17 [error] 18434#0')[:6], self.assertEqual(logTime, mu)
m1) self.assertEqual(logMatch.group(), '2012/10/11 02:37:17')
def testDateDetectorTemplateOverlap(self): def testDateDetectorTemplateOverlap(self):
patterns = [template.getPattern() patterns = [template.getPattern()
for template in self.__datedetector.getTemplates() for template in self.__datedetector.getTemplates()
if hasattr(template, "getPattern")] if hasattr(template, "getPattern")]
ZERO = datetime.timedelta(0)
HOUR = datetime.timedelta(hours=1)
# A UTC class. to make %z formats work
class UTC(datetime.tzinfo):
"""UTC"""
def utcoffset(self, dt):
return ZERO
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return ZERO
year = 2008 # Leap year, 08 for %y can be confused with both %d and %m year = 2008 # Leap year, 08 for %y can be confused with both %d and %m
def iterDates(year): def iterDates(year):
for month in xrange(1, 13): for month in xrange(1, 13):
@ -156,7 +145,7 @@ class DateDetectorTest(unittest.TestCase):
for minute in xrange(0, 60, 15): for minute in xrange(0, 60, 15):
for second in xrange(0, 60, 15): # Far enough? for second in xrange(0, 60, 15): # Far enough?
yield datetime.datetime( yield datetime.datetime(
year, month, day, hour, minute, second, 300, UTC()) year, month, day, hour, minute, second, 300, Utc())
overlapedTemplates = set() overlapedTemplates = set()
for date in iterDates(year): for date in iterDates(year):

View File

@ -40,6 +40,7 @@ from server.failmanager import FailManagerEmpty
# #
from utils import mtimesleep from utils import mtimesleep
from server.mytime import MyTime
# yoh: per Steven Hiscocks's insight while troubleshooting # yoh: per Steven Hiscocks's insight while troubleshooting
# https://github.com/fail2ban/fail2ban/issues/103#issuecomment-15542836 # https://github.com/fail2ban/fail2ban/issues/103#issuecomment-15542836
@ -78,8 +79,8 @@ def _assert_equal_entries(utest, found, output, count=None):
utest.assertEqual(found[0], output[0]) # IP utest.assertEqual(found[0], output[0]) # IP
utest.assertEqual(found[1], count or output[1]) # count utest.assertEqual(found[1], count or output[1]) # count
found_time, output_time = \ found_time, output_time = \
time.localtime(found[2]),\ MyTime.localtime(found[2]),\
time.localtime(output[2]) MyTime.localtime(output[2])
utest.assertEqual(found_time, output_time) utest.assertEqual(found_time, output_time)
if len(output) > 3 and count is None: # match matches if len(output) > 3 and count is None: # match matches
# do not check if custom count (e.g. going through them twice) # do not check if custom count (e.g. going through them twice)
@ -560,7 +561,7 @@ class GetFailures(unittest.TestCase):
FILENAME_USEDNS = "testcases/files/testcase-usedns.log" FILENAME_USEDNS = "testcases/files/testcase-usedns.log"
# so that they could be reused by other tests # so that they could be reused by other tests
FAILURES_01 = ('193.168.0.128', 3, 1124013599.0, FAILURES_01 = ('193.168.0.128', 3, 1124017199.0,
['Aug 14 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128\n']*3) ['Aug 14 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128\n']*3)
def setUp(self): def setUp(self):
@ -604,7 +605,7 @@ class GetFailures(unittest.TestCase):
def testGetFailures02(self): def testGetFailures02(self):
output = ('141.3.81.106', 4, 1124013539.0, output = ('141.3.81.106', 4, 1124017139.0,
['Aug 14 11:%d:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:141.3.81.106 port 51332 ssh2\n' ['Aug 14 11:%d:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:141.3.81.106 port 51332 ssh2\n'
% m for m in 53, 54, 57, 58]) % m for m in 53, 54, 57, 58])
@ -614,7 +615,7 @@ class GetFailures(unittest.TestCase):
_assert_correct_last_attempt(self, self.filter, output) _assert_correct_last_attempt(self, self.filter, output)
def testGetFailures03(self): def testGetFailures03(self):
output = ('203.162.223.135', 6, 1124013544.0) output = ('203.162.223.135', 6, 1124017144.0)
self.filter.addLogPath(GetFailures.FILENAME_03) self.filter.addLogPath(GetFailures.FILENAME_03)
self.filter.addFailRegex("error,relay=<HOST>,.*550 User unknown") self.filter.addFailRegex("error,relay=<HOST>,.*550 User unknown")
@ -622,8 +623,8 @@ class GetFailures(unittest.TestCase):
_assert_correct_last_attempt(self, self.filter, output) _assert_correct_last_attempt(self, self.filter, output)
def testGetFailures04(self): def testGetFailures04(self):
output = [('212.41.96.186', 4, 1124013600.0), output = [('212.41.96.186', 4, 1124017200.0),
('212.41.96.185', 4, 1124013598.0)] ('212.41.96.185', 4, 1124017198.0)]
self.filter.addLogPath(GetFailures.FILENAME_04) self.filter.addLogPath(GetFailures.FILENAME_04)
self.filter.addFailRegex("Invalid user .* <HOST>") self.filter.addFailRegex("Invalid user .* <HOST>")
@ -637,11 +638,11 @@ class GetFailures(unittest.TestCase):
def testGetFailuresUseDNS(self): def testGetFailuresUseDNS(self):
# We should still catch failures with usedns = no ;-) # We should still catch failures with usedns = no ;-)
output_yes = ('93.184.216.119', 2, 1124013539.0, output_yes = ('93.184.216.119', 2, 1124017139.0,
['Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2\n', ['Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2\n',
'Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.119 port 51332 ssh2\n']) 'Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.119 port 51332 ssh2\n'])
output_no = ('93.184.216.119', 1, 1124013539.0, output_no = ('93.184.216.119', 1, 1124017139.0,
['Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.119 port 51332 ssh2\n']) ['Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.119 port 51332 ssh2\n'])
# Actually no exception would be raised -- it will be just set to 'no' # Actually no exception would be raised -- it will be just set to 'no'
@ -663,7 +664,7 @@ class GetFailures(unittest.TestCase):
def testGetFailuresMultiRegex(self): def testGetFailuresMultiRegex(self):
output = ('141.3.81.106', 8, 1124013541.0) output = ('141.3.81.106', 8, 1124017141.0)
self.filter.addLogPath(GetFailures.FILENAME_02) self.filter.addLogPath(GetFailures.FILENAME_02)
self.filter.addFailRegex("Failed .* from <HOST>") self.filter.addFailRegex("Failed .* from <HOST>")
@ -672,7 +673,7 @@ class GetFailures(unittest.TestCase):
_assert_correct_last_attempt(self, self.filter, output) _assert_correct_last_attempt(self, self.filter, output)
def testGetFailuresIgnoreRegex(self): def testGetFailuresIgnoreRegex(self):
output = ('141.3.81.106', 8, 1124013541.0) output = ('141.3.81.106', 8, 1124017141.0)
self.filter.addLogPath(GetFailures.FILENAME_02) self.filter.addLogPath(GetFailures.FILENAME_02)
self.filter.addFailRegex("Failed .* from <HOST>") self.filter.addFailRegex("Failed .* from <HOST>")

View File

@ -22,7 +22,9 @@
__copyright__ = "Copyright (c) 2013 Steven Hiscocks" __copyright__ = "Copyright (c) 2013 Steven Hiscocks"
__license__ = "GPL" __license__ = "GPL"
import unittest, sys, os, fileinput, re, datetime, inspect import unittest, sys, os, fileinput, re, time, datetime, inspect
from server.mytime import MyTime
if sys.version_info >= (2, 6): if sys.version_info >= (2, 6):
import json import json
@ -110,15 +112,19 @@ def testSampleRegexsFactory(name):
(map(lambda x: x[0], ret),logFile.filename(), logFile.filelineno())) (map(lambda x: x[0], ret),logFile.filename(), logFile.filelineno()))
# Verify timestamp and host as expected # Verify timestamp and host as expected
failregex, host, time = ret[0] failregex, host, fail2banTime = ret[0]
self.assertEqual(host, faildata.get("host", None)) self.assertEqual(host, faildata.get("host", None))
fail2banTime = datetime.datetime.fromtimestamp(time)
jsonTime = datetime.datetime.strptime( t = faildata.get("time", None)
faildata.get("time", None), "%Y-%m-%dT%H:%M:%S") jsonTimeLocal = datetime.datetime.strptime(t, "%Y-%m-%dT%H:%M:%S")
jsonTime = time.mktime(jsonTimeLocal.utctimetuple())
self.assertEqual(fail2banTime, jsonTime, self.assertEqual(fail2banTime, jsonTime,
"Time mismatch %s != %s on: %s:%i %r:" % "UTC Time mismatch fail2ban %s (%s) != failJson %s (%s) (diff %i seconds) on: %s:%i %r:" %
(fail2banTime, jsonTime, logFile.filename(), logFile.filelineno(), line ) ) (fail2banTime, time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(fail2banTime)),
jsonTime, time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(jsonTime)),
fail2banTime - jsonTime, logFile.filename(), logFile.filelineno(), line ) )
regexsUsed.add(failregex) regexsUsed.add(failregex)