resolve deprecated syntax (eliminate "invalid sequence" warnings)

pull/2427/head
sebres 2019-05-14 15:44:20 +02:00
parent 0426a24719
commit 3036ed1889
8 changed files with 80 additions and 80 deletions

View File

@ -128,52 +128,52 @@ class DateDetectorCache(object):
# 2005-01-23T21:59:59.981746, 2005-01-23 21:59:59, 2005-01-23 8:59:59 # 2005-01-23T21:59:59.981746, 2005-01-23 21:59:59, 2005-01-23 8:59:59
# simple date: 2005/01/23 21:59:59 # simple date: 2005/01/23 21:59:59
# custom for syslog-ng 2006.12.21 06:43:20 # custom for syslog-ng 2006.12.21 06:43:20
"%ExY(?P<_sep>[-/.])%m(?P=_sep)%d(?:T| ?)%H:%M:%S(?:[.,]%f)?(?:\s*%z)?", r"%ExY(?P<_sep>[-/.])%m(?P=_sep)%d(?:T| ?)%H:%M:%S(?:[.,]%f)?(?:\s*%z)?",
# asctime with optional day, subsecond and/or year: # asctime with optional day, subsecond and/or year:
# Sun Jan 23 21:59:59.011 2005 # Sun Jan 23 21:59:59.011 2005
"(?:%a )?%b %d %k:%M:%S(?:\.%f)?(?: %ExY)?", r"(?:%a )?%b %d %k:%M:%S(?:\.%f)?(?: %ExY)?",
# asctime with optional day, subsecond and/or year coming after day # asctime with optional day, subsecond and/or year coming after day
# http://bugs.debian.org/798923 # http://bugs.debian.org/798923
# Sun Jan 23 2005 21:59:59.011 # Sun Jan 23 2005 21:59:59.011
"(?:%a )?%b %d %ExY %k:%M:%S(?:\.%f)?", r"(?:%a )?%b %d %ExY %k:%M:%S(?:\.%f)?",
# simple date too (from x11vnc): 23/01/2005 21:59:59 # simple date too (from x11vnc): 23/01/2005 21:59:59
# and with optional year given by 2 digits: 23/01/05 21:59:59 # and with optional year given by 2 digits: 23/01/05 21:59:59
# (See http://bugs.debian.org/537610) # (See http://bugs.debian.org/537610)
# 17-07-2008 17:23:25 # 17-07-2008 17:23:25
"%d(?P<_sep>[-/])%m(?P=_sep)(?:%ExY|%Exy) %k:%M:%S", r"%d(?P<_sep>[-/])%m(?P=_sep)(?:%ExY|%Exy) %k:%M:%S",
# Apache format optional time zone: # Apache format optional time zone:
# [31/Oct/2006:09:22:55 -0000] # [31/Oct/2006:09:22:55 -0000]
# 26-Jul-2007 15:20:52 # 26-Jul-2007 15:20:52
# named 26-Jul-2007 15:20:52.252 # named 26-Jul-2007 15:20:52.252
# roundcube 26-Jul-2007 15:20:52 +0200 # roundcube 26-Jul-2007 15:20:52 +0200
"%d(?P<_sep>[-/])%b(?P=_sep)%ExY[ :]?%H:%M:%S(?:\.%f)?(?: %z)?", r"%d(?P<_sep>[-/])%b(?P=_sep)%ExY[ :]?%H:%M:%S(?:\.%f)?(?: %z)?",
# CPanel 05/20/2008:01:57:39 # CPanel 05/20/2008:01:57:39
"%m/%d/%ExY:%H:%M:%S", r"%m/%d/%ExY:%H:%M:%S",
# 01-27-2012 16:22:44.252 # 01-27-2012 16:22:44.252
# subseconds explicit to avoid possible %m<->%d confusion # subseconds explicit to avoid possible %m<->%d confusion
# with previous ("%d-%m-%ExY %k:%M:%S" by "%d(?P<_sep>[-/])%m(?P=_sep)(?:%ExY|%Exy) %k:%M:%S") # with previous ("%d-%m-%ExY %k:%M:%S" by "%d(?P<_sep>[-/])%m(?P=_sep)(?:%ExY|%Exy) %k:%M:%S")
"%m-%d-%ExY %k:%M:%S(?:\.%f)?", r"%m-%d-%ExY %k:%M:%S(?:\.%f)?",
# Epoch # Epoch
"EPOCH", r"EPOCH",
# Only time information in the log # Only time information in the log
"{^LN-BEG}%H:%M:%S", r"{^LN-BEG}%H:%M:%S",
# <09/16/08@05:03:30> # <09/16/08@05:03:30>
"^<%m/%d/%Exy@%H:%M:%S>", r"^<%m/%d/%Exy@%H:%M:%S>",
# MySQL: 130322 11:46:11 # MySQL: 130322 11:46:11
"%Exy%Exm%Exd ?%H:%M:%S", r"%Exy%Exm%Exd ?%H:%M:%S",
# Apache Tomcat # Apache Tomcat
"%b %d, %ExY %I:%M:%S %p", r"%b %d, %ExY %I:%M:%S %p",
# ASSP: Apr-27-13 02:33:06 # ASSP: Apr-27-13 02:33:06
"^%b-%d-%Exy %k:%M:%S", r"^%b-%d-%Exy %k:%M:%S",
# 20050123T215959, 20050123 215959, 20050123 85959 # 20050123T215959, 20050123 215959, 20050123 85959
"%ExY%Exm%Exd(?:T| ?)%ExH%ExM%ExS(?:[.,]%f)?(?:\s*%z)?", r"%ExY%Exm%Exd(?:T| ?)%ExH%ExM%ExS(?:[.,]%f)?(?:\s*%z)?",
# prefixed with optional named time zone (monit): # prefixed with optional named time zone (monit):
# PDT Apr 16 21:05:29 # PDT Apr 16 21:05:29
"(?:%Z )?(?:%a )?%b %d %k:%M:%S(?:\.%f)?(?: %ExY)?", r"(?:%Z )?(?:%a )?%b %d %k:%M:%S(?:\.%f)?(?: %ExY)?",
# +00:00 Jan 23 21:59:59.011 2005 # +00:00 Jan 23 21:59:59.011 2005
"(?:%z )?(?:%a )?%b %d %k:%M:%S(?:\.%f)?(?: %ExY)?", r"(?:%z )?(?:%a )?%b %d %k:%M:%S(?:\.%f)?(?: %ExY)?",
# TAI64N # TAI64N
"TAI64N", r"TAI64N",
] ]
@property @property

View File

@ -82,7 +82,7 @@ class DateTemplate(object):
return self._regex return self._regex
def setRegex(self, regex, wordBegin=True, wordEnd=True): def setRegex(self, regex, wordBegin=True, wordEnd=True):
"""Sets regex to use for searching for date in log line. r"""Sets regex to use for searching for date in log line.
Parameters Parameters
---------- ----------

View File

@ -206,15 +206,15 @@ class CommandActionTest(LogCaptureTestCase):
"Text 890 text 123 ABC") "Text 890 text 123 ABC")
self.assertEqual( self.assertEqual(
self.__action.replaceTag("<matches>", self.__action.replaceTag("<matches>",
{'matches': "some >char< should \< be[ escap}ed&\n"}), {'matches': "some >char< should \\< be[ escap}ed&\n"}),
"some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\n") "some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\n")
self.assertEqual( self.assertEqual(
self.__action.replaceTag("<ipmatches>", self.__action.replaceTag("<ipmatches>",
{'ipmatches': "some >char< should \< be[ escap}ed&\n"}), {'ipmatches': "some >char< should \\< be[ escap}ed&\n"}),
"some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\n") "some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\n")
self.assertEqual( self.assertEqual(
self.__action.replaceTag("<ipjailmatches>", self.__action.replaceTag("<ipjailmatches>",
{'ipjailmatches': "some >char< should \< be[ escap}ed&\n"}), {'ipjailmatches': "some >char< should \\< be[ escap}ed&\n"}),
"some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\n") "some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\n")
# Recursive # Recursive

View File

@ -103,7 +103,7 @@ class DateDetectorTest(LogCaptureTestCase):
def testGetEpochPattern(self): def testGetEpochPattern(self):
self.__datedetector = DateDetector() self.__datedetector = DateDetector()
self.__datedetector.appendTemplate('(?<=\|\s){LEPOCH}(?=\s\|)') self.__datedetector.appendTemplate(r'(?<=\|\s){LEPOCH}(?=\s\|)')
# correct short/long epoch time, using all variants: # correct short/long epoch time, using all variants:
for fact in (1, 1000, 1000000): for fact in (1, 1000, 1000000):
for dateUnix in (1138049999, 32535244799): for dateUnix in (1138049999, 32535244799):
@ -385,7 +385,7 @@ class DateDetectorTest(LogCaptureTestCase):
self.assertRaises(Exception, t.getDate, 'no date line') self.assertRaises(Exception, t.getDate, 'no date line')
iso8601 = DatePatternRegex("%Y-%m-%d[T ]%H:%M:%S(?:\.%f)?%z") iso8601 = DatePatternRegex(r"%Y-%m-%d[T ]%H:%M:%S(?:\.%f)?%z")
class CustomDateFormatsTest(unittest.TestCase): class CustomDateFormatsTest(unittest.TestCase):

View File

@ -835,7 +835,7 @@ class Fail2banServerTest(Fail2banClientServerBase):
"usedns = no", "usedns = no",
"maxretry = 3", "maxretry = 3",
"findtime = 10m", "findtime = 10m",
"failregex = ^\s*failure <F-ERRCODE>401|403</F-ERRCODE> from <HOST>", r"failregex = ^\s*failure <F-ERRCODE>401|403</F-ERRCODE> from <HOST>",
"datepattern = {^LN-BEG}EPOCH", "datepattern = {^LN-BEG}EPOCH",
"ignoreip = 127.0.0.1/8 ::1", # just to cover ignoreip in jailreader/transmitter "ignoreip = 127.0.0.1/8 ::1", # just to cover ignoreip in jailreader/transmitter
"", "",
@ -851,8 +851,8 @@ class Fail2banServerTest(Fail2banClientServerBase):
"logpath = " + test1log, "logpath = " + test1log,
" " + test2log if 2 in enabled else "", " " + test2log if 2 in enabled else "",
" " + test3log if 2 in enabled else "", " " + test3log if 2 in enabled else "",
"failregex = ^\s*failure <F-ERRCODE>401|403</F-ERRCODE> from <HOST>", r"failregex = ^\s*failure <F-ERRCODE>401|403</F-ERRCODE> from <HOST>",
" ^\s*error <F-ERRCODE>401|403</F-ERRCODE> from <HOST>" \ r" ^\s*error <F-ERRCODE>401|403</F-ERRCODE> from <HOST>" \
if 2 in enabled else "", if 2 in enabled else "",
"enabled = true" if 1 in enabled else "", "enabled = true" if 1 in enabled else "",
"", "",

View File

@ -120,7 +120,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testDirectFound(self): def testDirectFound(self):
(opts, args, fail2banRegex) = _Fail2banRegex( (opts, args, fail2banRegex) = _Fail2banRegex(
"--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?", "--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched", "--print-no-missed", "--print-all-matched", "--print-no-missed",
"Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0", "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
r"Authentication failure for .*? from <HOST>$" r"Authentication failure for .*? from <HOST>$"
@ -149,7 +149,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testDirectRE_1(self): def testDirectRE_1(self):
(opts, args, fail2banRegex) = _Fail2banRegex( (opts, args, fail2banRegex) = _Fail2banRegex(
"--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?", "--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched", "--print-all-matched",
Fail2banRegexTest.FILENAME_01, Fail2banRegexTest.FILENAME_01,
Fail2banRegexTest.RE_00 Fail2banRegexTest.RE_00
@ -165,7 +165,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testDirectRE_1raw(self): def testDirectRE_1raw(self):
(opts, args, fail2banRegex) = _Fail2banRegex( (opts, args, fail2banRegex) = _Fail2banRegex(
"--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?", "--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched", "--raw", "--print-all-matched", "--raw",
Fail2banRegexTest.FILENAME_01, Fail2banRegexTest.FILENAME_01,
Fail2banRegexTest.RE_00 Fail2banRegexTest.RE_00
@ -175,7 +175,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testDirectRE_1raw_noDns(self): def testDirectRE_1raw_noDns(self):
(opts, args, fail2banRegex) = _Fail2banRegex( (opts, args, fail2banRegex) = _Fail2banRegex(
"--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?", "--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched", "--raw", "--usedns=no", "--print-all-matched", "--raw", "--usedns=no",
Fail2banRegexTest.FILENAME_01, Fail2banRegexTest.FILENAME_01,
Fail2banRegexTest.RE_00 Fail2banRegexTest.RE_00
@ -185,7 +185,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testDirectRE_2(self): def testDirectRE_2(self):
(opts, args, fail2banRegex) = _Fail2banRegex( (opts, args, fail2banRegex) = _Fail2banRegex(
"--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?", "--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched", "--print-all-matched",
Fail2banRegexTest.FILENAME_02, Fail2banRegexTest.FILENAME_02,
Fail2banRegexTest.RE_00 Fail2banRegexTest.RE_00
@ -195,7 +195,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testVerbose(self): def testVerbose(self):
(opts, args, fail2banRegex) = _Fail2banRegex( (opts, args, fail2banRegex) = _Fail2banRegex(
"--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?", "--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--timezone", "UTC+0200", "--timezone", "UTC+0200",
"--verbose", "--verbose-date", "--print-no-missed", "--verbose", "--verbose-date", "--print-no-missed",
Fail2banRegexTest.FILENAME_02, Fail2banRegexTest.FILENAME_02,
@ -332,7 +332,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
self._reset() self._reset()
(opts, args, fail2banRegex) = _Fail2banRegex( (opts, args, fail2banRegex) = _Fail2banRegex(
"-l", "notice", # put down log-level, because of too many debug-messages "-l", "notice", # put down log-level, because of too many debug-messages
"--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?", "--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
Fail2banRegexTest.FILENAME_WRONGCHAR, Fail2banRegexTest.FILTER_SSHD Fail2banRegexTest.FILENAME_WRONGCHAR, Fail2banRegexTest.FILTER_SSHD
) )
self.assertTrue(fail2banRegex.start(args)) self.assertTrue(fail2banRegex.start(args))
@ -349,7 +349,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
self._reset() self._reset()
(opts, args, fail2banRegex) = _Fail2banRegex( (opts, args, fail2banRegex) = _Fail2banRegex(
"-l", "notice", # put down log-level, because of too many debug-messages "-l", "notice", # put down log-level, because of too many debug-messages
"--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?", "--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--debuggex", "--print-all-matched", "--debuggex", "--print-all-matched",
Fail2banRegexTest.FILENAME_WRONGCHAR, Fail2banRegexTest.FILTER_SSHD, Fail2banRegexTest.FILENAME_WRONGCHAR, Fail2banRegexTest.FILTER_SSHD,
r"llinco[^\\]" r"llinco[^\\]"

View File

@ -279,10 +279,10 @@ class BasicFilter(unittest.TestCase):
def testGetSetDatePattern(self): def testGetSetDatePattern(self):
self.assertEqual(self.filter.getDatePattern(), self.assertEqual(self.filter.getDatePattern(),
(None, "Default Detectors")) (None, "Default Detectors"))
self.filter.setDatePattern("^%Y-%m-%d-%H%M%S.%f %z **") self.filter.setDatePattern(r"^%Y-%m-%d-%H%M%S\.%f %z **")
self.assertEqual(self.filter.getDatePattern(), self.assertEqual(self.filter.getDatePattern(),
("^%Y-%m-%d-%H%M%S.%f %z **", (r"^%Y-%m-%d-%H%M%S\.%f %z **",
"^Year-Month-Day-24hourMinuteSecond.Microseconds Zone offset **")) r"^Year-Month-Day-24hourMinuteSecond\.Microseconds Zone offset **"))
def testGetSetLogTimeZone(self): def testGetSetLogTimeZone(self):
self.assertEqual(self.filter.getLogTimeZone(), None) self.assertEqual(self.filter.getLogTimeZone(), None)
@ -389,7 +389,7 @@ class IgnoreIP(LogCaptureTestCase):
setUpMyTime() setUpMyTime()
self.filter.addIgnoreIP('192.168.1.0/25') self.filter.addIgnoreIP('192.168.1.0/25')
self.filter.addFailRegex('<HOST>') self.filter.addFailRegex('<HOST>')
self.filter.setDatePattern('{^LN-BEG}EPOCH') self.filter.setDatePattern(r'{^LN-BEG}EPOCH')
self.filter.processLineAndAdd('1387203300.222 192.168.1.32') self.filter.processLineAndAdd('1387203300.222 192.168.1.32')
self.assertLogged('Ignore 192.168.1.32') self.assertLogged('Ignore 192.168.1.32')
tearDownMyTime() tearDownMyTime()
@ -580,7 +580,7 @@ class LogFileFilterPoll(unittest.TestCase):
def testSeekToTimeSmallFile(self): def testSeekToTimeSmallFile(self):
# speedup search using exact date pattern: # speedup search using exact date pattern:
self.filter.setDatePattern('^%ExY-%Exm-%Exd %ExH:%ExM:%ExS') self.filter.setDatePattern(r'^%ExY-%Exm-%Exd %ExH:%ExM:%ExS')
fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='.log') fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='.log')
time = 1417512352 time = 1417512352
f = open(fname, 'w') f = open(fname, 'w')
@ -666,7 +666,7 @@ class LogFileFilterPoll(unittest.TestCase):
def testSeekToTimeLargeFile(self): def testSeekToTimeLargeFile(self):
# speedup search using exact date pattern: # speedup search using exact date pattern:
self.filter.setDatePattern('^%ExY-%Exm-%Exd %ExH:%ExM:%ExS') self.filter.setDatePattern(r'^%ExY-%Exm-%Exd %ExH:%ExM:%ExS')
fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='.log') fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='.log')
time = 1417512352 time = 1417512352
f = open(fname, 'w') f = open(fname, 'w')
@ -723,7 +723,7 @@ class LogFileMonitor(LogCaptureTestCase):
self.filter = FilterPoll(DummyJail()) self.filter = FilterPoll(DummyJail())
self.filter.addLogPath(self.name, autoSeek=False) self.filter.addLogPath(self.name, autoSeek=False)
self.filter.active = True self.filter.active = True
self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>") self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
def tearDown(self): def tearDown(self):
tearDownMyTime() tearDownMyTime()
@ -765,7 +765,7 @@ class LogFileMonitor(LogCaptureTestCase):
def testErrorProcessLine(self): def testErrorProcessLine(self):
# speedup search using exact date pattern: # speedup search using exact date pattern:
self.filter.setDatePattern('^%ExY-%Exm-%Exd %ExH:%ExM:%ExS') self.filter.setDatePattern(r'^%ExY-%Exm-%Exd %ExH:%ExM:%ExS')
self.filter.sleeptime /= 1000.0 self.filter.sleeptime /= 1000.0
## produce error with not callable processLine: ## produce error with not callable processLine:
_org_processLine = self.filter.processLine _org_processLine = self.filter.processLine
@ -829,7 +829,7 @@ class LogFileMonitor(LogCaptureTestCase):
def testNewChangeViaGetFailures_simple(self): def testNewChangeViaGetFailures_simple(self):
# speedup search using exact date pattern: # speedup search using exact date pattern:
self.filter.setDatePattern('^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?') self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
# suck in lines from this sample log file # suck in lines from this sample log file
self.filter.getFailures(self.name) self.filter.getFailures(self.name)
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
@ -846,7 +846,7 @@ class LogFileMonitor(LogCaptureTestCase):
def testNewChangeViaGetFailures_rewrite(self): def testNewChangeViaGetFailures_rewrite(self):
# speedup search using exact date pattern: # speedup search using exact date pattern:
self.filter.setDatePattern('^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?') self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
# #
# if we rewrite the file at once # if we rewrite the file at once
self.file.close() self.file.close()
@ -866,7 +866,7 @@ class LogFileMonitor(LogCaptureTestCase):
def testNewChangeViaGetFailures_move(self): def testNewChangeViaGetFailures_move(self):
# speedup search using exact date pattern: # speedup search using exact date pattern:
self.filter.setDatePattern('^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?') self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
# #
# if we move file into a new location while it has been open already # if we move file into a new location while it has been open already
self.file.close() self.file.close()
@ -940,9 +940,9 @@ def get_monitor_failures_testcase(Filter_):
self.filter = Filter_(self.jail) self.filter = Filter_(self.jail)
self.filter.addLogPath(self.name, autoSeek=False) self.filter.addLogPath(self.name, autoSeek=False)
# speedup search using exact date pattern: # speedup search using exact date pattern:
self.filter.setDatePattern('^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?') self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
self.filter.active = True self.filter.active = True
self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>") self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
self.filter.start() self.filter.start()
# If filter is polling it would sleep a bit to guarantee that # If filter is polling it would sleep a bit to guarantee that
# we have initial time-stamp difference to trigger "actions" # we have initial time-stamp difference to trigger "actions"
@ -1244,7 +1244,7 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
"SYSLOG_IDENTIFIER=fail2ban-testcases", "SYSLOG_IDENTIFIER=fail2ban-testcases",
"TEST_FIELD=2", "TEST_FIELD=2",
"TEST_UUID=%s" % self.test_uuid]) "TEST_UUID=%s" % self.test_uuid])
self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>") self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
def tearDown(self): def tearDown(self):
if self.filter and self.filter.active: if self.filter and self.filter.active:
@ -1440,7 +1440,7 @@ class GetFailures(LogCaptureTestCase):
self.filter = FileFilter(self.jail) self.filter = FileFilter(self.jail)
self.filter.active = True self.filter.active = True
# speedup search using exact date pattern: # speedup search using exact date pattern:
self.filter.setDatePattern('^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?') self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
# TODO Test this # TODO Test this
#self.filter.setTimeRegex("\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}") #self.filter.setTimeRegex("\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}")
#self.filter.setTimePattern("%b %d %H:%M:%S") #self.filter.setTimePattern("%b %d %H:%M:%S")
@ -1485,7 +1485,7 @@ class GetFailures(LogCaptureTestCase):
failures = failures or GetFailures.FAILURES_01 failures = failures or GetFailures.FAILURES_01
self.filter.addLogPath(filename, autoSeek=0) self.filter.addLogPath(filename, autoSeek=0)
self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>$") self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>$")
self.filter.getFailures(filename) self.filter.getFailures(filename)
_assert_correct_last_attempt(self, self.filter, failures) _assert_correct_last_attempt(self, self.filter, failures)
@ -1509,7 +1509,7 @@ class GetFailures(LogCaptureTestCase):
% m for m in 53, 54, 57, 58]) % m for m in 53, 54, 57, 58])
self.filter.addLogPath(GetFailures.FILENAME_02, autoSeek=0) self.filter.addLogPath(GetFailures.FILENAME_02, autoSeek=0)
self.filter.addFailRegex("Failed .* from <HOST>") self.filter.addFailRegex(r"Failed .* from <HOST>")
self.filter.getFailures(GetFailures.FILENAME_02) self.filter.getFailures(GetFailures.FILENAME_02)
_assert_correct_last_attempt(self, self.filter, output) _assert_correct_last_attempt(self, self.filter, output)
@ -1517,7 +1517,7 @@ class GetFailures(LogCaptureTestCase):
output = ('203.162.223.135', 7, 1124013544.0) output = ('203.162.223.135', 7, 1124013544.0)
self.filter.addLogPath(GetFailures.FILENAME_03, autoSeek=0) self.filter.addLogPath(GetFailures.FILENAME_03, autoSeek=0)
self.filter.addFailRegex("error,relay=<HOST>,.*550 User unknown") self.filter.addFailRegex(r"error,relay=<HOST>,.*550 User unknown")
self.filter.getFailures(GetFailures.FILENAME_03) self.filter.getFailures(GetFailures.FILENAME_03)
_assert_correct_last_attempt(self, self.filter, output) _assert_correct_last_attempt(self, self.filter, output)
@ -1526,7 +1526,7 @@ class GetFailures(LogCaptureTestCase):
output = ('203.162.223.135', 5, 1124013544.0) output = ('203.162.223.135', 5, 1124013544.0)
self.filter.addLogPath(GetFailures.FILENAME_03, autoSeek=output[2] - 4*60) self.filter.addLogPath(GetFailures.FILENAME_03, autoSeek=output[2] - 4*60)
self.filter.addFailRegex("error,relay=<HOST>,.*550 User unknown") self.filter.addFailRegex(r"error,relay=<HOST>,.*550 User unknown")
self.filter.getFailures(GetFailures.FILENAME_03) self.filter.getFailures(GetFailures.FILENAME_03)
_assert_correct_last_attempt(self, self.filter, output) _assert_correct_last_attempt(self, self.filter, output)
@ -1536,7 +1536,7 @@ class GetFailures(LogCaptureTestCase):
self.filter.setMaxRetry(1) self.filter.setMaxRetry(1)
self.filter.addLogPath(GetFailures.FILENAME_03, autoSeek=output[2]) self.filter.addLogPath(GetFailures.FILENAME_03, autoSeek=output[2])
self.filter.addFailRegex("error,relay=<HOST>,.*550 User unknown") self.filter.addFailRegex(r"error,relay=<HOST>,.*550 User unknown")
self.filter.getFailures(GetFailures.FILENAME_03) self.filter.getFailures(GetFailures.FILENAME_03)
_assert_correct_last_attempt(self, self.filter, output) _assert_correct_last_attempt(self, self.filter, output)
@ -1548,13 +1548,13 @@ class GetFailures(LogCaptureTestCase):
('212.41.96.185', 2, 1124013598.0)) ('212.41.96.185', 2, 1124013598.0))
# speedup search using exact date pattern: # speedup search using exact date pattern:
self.filter.setDatePattern(('^%ExY(?P<_sep>[-/.])%m(?P=_sep)%d[T ]%H:%M:%S(?:[.,]%f)?(?:\s*%z)?', self.filter.setDatePattern((r'^%ExY(?P<_sep>[-/.])%m(?P=_sep)%d[T ]%H:%M:%S(?:[.,]%f)?(?:\s*%z)?',
'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?', r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?',
'^EPOCH' r'^EPOCH'
)) ))
self.filter.setMaxRetry(2) self.filter.setMaxRetry(2)
self.filter.addLogPath(GetFailures.FILENAME_04, autoSeek=0) self.filter.addLogPath(GetFailures.FILENAME_04, autoSeek=0)
self.filter.addFailRegex("Invalid user .* <HOST>") self.filter.addFailRegex(r"Invalid user .* <HOST>")
self.filter.getFailures(GetFailures.FILENAME_04) self.filter.getFailures(GetFailures.FILENAME_04)
_assert_correct_last_attempt(self, self.filter, output) _assert_correct_last_attempt(self, self.filter, output)
@ -1574,7 +1574,7 @@ class GetFailures(LogCaptureTestCase):
fout.close() fout.close()
# #
output = ('192.0.2.0', 3, 1421262060.0) output = ('192.0.2.0', 3, 1421262060.0)
failregex = "^\s*user \"[^\"]*\" from \"<HOST>\"\s*$" failregex = r"^\s*user \"[^\"]*\" from \"<HOST>\"\s*$"
# test encoding auto or direct set of encoding: # test encoding auto or direct set of encoding:
for enc in (None, 'utf-8', 'ascii'): for enc in (None, 'utf-8', 'ascii'):
@ -1582,7 +1582,7 @@ class GetFailures(LogCaptureTestCase):
self.tearDown();self.setUp(); self.tearDown();self.setUp();
self.filter.setLogEncoding(enc); self.filter.setLogEncoding(enc);
# speedup search using exact date pattern: # speedup search using exact date pattern:
self.filter.setDatePattern('^%ExY-%Exm-%Exd %ExH:%ExM:%ExS') self.filter.setDatePattern(r'^%ExY-%Exm-%Exd %ExH:%ExM:%ExS')
self.assertNotLogged('Error decoding line'); self.assertNotLogged('Error decoding line');
self.filter.addLogPath(fname) self.filter.addLogPath(fname)
self.filter.addFailRegex(failregex) self.filter.addFailRegex(failregex)
@ -1631,7 +1631,7 @@ class GetFailures(LogCaptureTestCase):
filter_.failManager.setMaxRetry(1) # we might have just few failures filter_.failManager.setMaxRetry(1) # we might have just few failures
filter_.addLogPath(GetFailures.FILENAME_USEDNS, autoSeek=False) filter_.addLogPath(GetFailures.FILENAME_USEDNS, autoSeek=False)
filter_.addFailRegex("Failed .* from <HOST>") filter_.addFailRegex(r"Failed .* from <HOST>")
filter_.getFailures(GetFailures.FILENAME_USEDNS) filter_.getFailures(GetFailures.FILENAME_USEDNS)
_assert_correct_last_attempt(self, filter_, output) _assert_correct_last_attempt(self, filter_, output)
@ -1639,15 +1639,15 @@ class GetFailures(LogCaptureTestCase):
output = ('141.3.81.106', 8, 1124013541.0) output = ('141.3.81.106', 8, 1124013541.0)
self.filter.addLogPath(GetFailures.FILENAME_02, autoSeek=False) self.filter.addLogPath(GetFailures.FILENAME_02, autoSeek=False)
self.filter.addFailRegex("Failed .* from <HOST>") self.filter.addFailRegex(r"Failed .* from <HOST>")
self.filter.addFailRegex("Accepted .* from <HOST>") self.filter.addFailRegex(r"Accepted .* from <HOST>")
self.filter.getFailures(GetFailures.FILENAME_02) self.filter.getFailures(GetFailures.FILENAME_02)
_assert_correct_last_attempt(self, self.filter, output) _assert_correct_last_attempt(self, self.filter, output)
def testGetFailuresIgnoreRegex(self): def testGetFailuresIgnoreRegex(self):
self.filter.addLogPath(GetFailures.FILENAME_02, autoSeek=False) self.filter.addLogPath(GetFailures.FILENAME_02, autoSeek=False)
self.filter.addFailRegex("Failed .* from <HOST>") self.filter.addFailRegex(r"Failed .* from <HOST>")
self.filter.addFailRegex("Accepted .* from <HOST>") self.filter.addFailRegex(r"Accepted .* from <HOST>")
self.filter.addIgnoreRegex("for roehl") self.filter.addIgnoreRegex("for roehl")
self.filter.getFailures(GetFailures.FILENAME_02) self.filter.getFailures(GetFailures.FILENAME_02)
@ -1659,7 +1659,7 @@ class GetFailures(LogCaptureTestCase):
("192.0.43.11", 1, 1124013598.0)] ("192.0.43.11", 1, 1124013598.0)]
self.filter.addLogPath(GetFailures.FILENAME_MULTILINE, autoSeek=False) self.filter.addLogPath(GetFailures.FILENAME_MULTILINE, autoSeek=False)
self.filter.setMaxLines(100) self.filter.setMaxLines(100)
self.filter.addFailRegex("^.*rsyncd\[(?P<pid>\d+)\]: connect from .+ \(<HOST>\)$<SKIPLINES>^.+ rsyncd\[(?P=pid)\]: rsync error: .*$") self.filter.addFailRegex(r"^.*rsyncd\[(?P<pid>\d+)\]: connect from .+ \(<HOST>\)$<SKIPLINES>^.+ rsyncd\[(?P=pid)\]: rsync error: .*$")
self.filter.setMaxRetry(1) self.filter.setMaxRetry(1)
self.filter.getFailures(GetFailures.FILENAME_MULTILINE) self.filter.getFailures(GetFailures.FILENAME_MULTILINE)
@ -1677,7 +1677,7 @@ class GetFailures(LogCaptureTestCase):
output = [("192.0.43.10", 2, 1124013599.0)] output = [("192.0.43.10", 2, 1124013599.0)]
self.filter.addLogPath(GetFailures.FILENAME_MULTILINE, autoSeek=False) self.filter.addLogPath(GetFailures.FILENAME_MULTILINE, autoSeek=False)
self.filter.setMaxLines(100) self.filter.setMaxLines(100)
self.filter.addFailRegex("^.*rsyncd\[(?P<pid>\d+)\]: connect from .+ \(<HOST>\)$<SKIPLINES>^.+ rsyncd\[(?P=pid)\]: rsync error: .*$") self.filter.addFailRegex(r"^.*rsyncd\[(?P<pid>\d+)\]: connect from .+ \(<HOST>\)$<SKIPLINES>^.+ rsyncd\[(?P=pid)\]: rsync error: .*$")
self.filter.addIgnoreRegex("rsync error: Received SIGINT") self.filter.addIgnoreRegex("rsync error: Received SIGINT")
self.filter.setMaxRetry(1) self.filter.setMaxRetry(1)
@ -1693,8 +1693,8 @@ class GetFailures(LogCaptureTestCase):
("192.0.43.15", 1, 1124013598.0)] ("192.0.43.15", 1, 1124013598.0)]
self.filter.addLogPath(GetFailures.FILENAME_MULTILINE, autoSeek=False) self.filter.addLogPath(GetFailures.FILENAME_MULTILINE, autoSeek=False)
self.filter.setMaxLines(100) self.filter.setMaxLines(100)
self.filter.addFailRegex("^.*rsyncd\[(?P<pid>\d+)\]: connect from .+ \(<HOST>\)$<SKIPLINES>^.+ rsyncd\[(?P=pid)\]: rsync error: .*$") self.filter.addFailRegex(r"^.*rsyncd\[(?P<pid>\d+)\]: connect from .+ \(<HOST>\)$<SKIPLINES>^.+ rsyncd\[(?P=pid)\]: rsync error: .*$")
self.filter.addFailRegex("^.* sendmail\[.*, msgid=<(?P<msgid>[^>]+).*relay=\[<HOST>\].*$<SKIPLINES>^.+ spamd: result: Y \d+ .*,mid=<(?P=msgid)>(,bayes=[.\d]+)?(,autolearn=\S+)?\s*$") self.filter.addFailRegex(r"^.* sendmail\[.*, msgid=<(?P<msgid>[^>]+).*relay=\[<HOST>\].*$<SKIPLINES>^.+ spamd: result: Y \d+ .*,mid=<(?P=msgid)>(,bayes=[.\d]+)?(,autolearn=\S+)?\s*$")
self.filter.setMaxRetry(1) self.filter.setMaxRetry(1)
self.filter.getFailures(GetFailures.FILENAME_MULTILINE) self.filter.getFailures(GetFailures.FILENAME_MULTILINE)

View File

@ -1017,28 +1017,28 @@ class RegexTests(unittest.TestCase):
def testHost(self): def testHost(self):
self.assertRaises(RegexException, FailRegex, '') self.assertRaises(RegexException, FailRegex, '')
self.assertRaises(RegexException, FailRegex, '^test no group$') self.assertRaises(RegexException, FailRegex, '^test no group$')
self.assertTrue(FailRegex('^test <HOST> group$')) self.assertTrue(FailRegex(r'^test <HOST> group$'))
self.assertTrue(FailRegex('^test <IP4> group$')) self.assertTrue(FailRegex(r'^test <IP4> group$'))
self.assertTrue(FailRegex('^test <IP6> group$')) self.assertTrue(FailRegex(r'^test <IP6> group$'))
self.assertTrue(FailRegex('^test <DNS> group$')) self.assertTrue(FailRegex(r'^test <DNS> group$'))
self.assertTrue(FailRegex('^test id group: ip:port = <F-ID><IP4>(?::<F-PORT/>)?</F-ID>$')) self.assertTrue(FailRegex(r'^test id group: ip:port = <F-ID><IP4>(?::<F-PORT/>)?</F-ID>$'))
self.assertTrue(FailRegex('^test id group: user:\(<F-ID>[^\)]+</F-ID>\)$')) self.assertTrue(FailRegex(r'^test id group: user:\(<F-ID>[^\)]+</F-ID>\)$'))
self.assertTrue(FailRegex('^test id group: anything = <F-ID/>$')) self.assertTrue(FailRegex(r'^test id group: anything = <F-ID/>$'))
# Testing obscure case when host group might be missing in the matched pattern, # Testing obscure case when host group might be missing in the matched pattern,
# e.g. if we made it optional. # e.g. if we made it optional.
fr = FailRegex('%%<HOST>?') fr = FailRegex(r'%%<HOST>?')
self.assertFalse(fr.hasMatched()) self.assertFalse(fr.hasMatched())
fr.search([('%%',"","")]) fr.search([('%%',"","")])
self.assertTrue(fr.hasMatched()) self.assertTrue(fr.hasMatched())
self.assertRaises(RegexException, fr.getHost) self.assertRaises(RegexException, fr.getHost)
# The same as above but using separated IPv4/IPv6 expressions # The same as above but using separated IPv4/IPv6 expressions
fr = FailRegex('%%inet(?:=<F-IP4/>|inet6=<F-IP6/>)?') fr = FailRegex(r'%%inet(?:=<F-IP4/>|inet6=<F-IP6/>)?')
self.assertFalse(fr.hasMatched()) self.assertFalse(fr.hasMatched())
fr.search([('%%inet=test',"","")]) fr.search([('%%inet=test',"","")])
self.assertTrue(fr.hasMatched()) self.assertTrue(fr.hasMatched())
self.assertRaises(RegexException, fr.getHost) self.assertRaises(RegexException, fr.getHost)
# Success case: using separated IPv4/IPv6 expressions (no HOST) # Success case: using separated IPv4/IPv6 expressions (no HOST)
fr = FailRegex('%%(?:inet(?:=<IP4>|6=<IP6>)?|dns=<DNS>?)') fr = FailRegex(r'%%(?:inet(?:=<IP4>|6=<IP6>)?|dns=<DNS>?)')
self.assertFalse(fr.hasMatched()) self.assertFalse(fr.hasMatched())
fr.search([('%%inet=192.0.2.1',"","")]) fr.search([('%%inet=192.0.2.1',"","")])
self.assertTrue(fr.hasMatched()) self.assertTrue(fr.hasMatched())
@ -1050,7 +1050,7 @@ class RegexTests(unittest.TestCase):
self.assertTrue(fr.hasMatched()) self.assertTrue(fr.hasMatched())
self.assertEqual(fr.getHost(), 'example.com') self.assertEqual(fr.getHost(), 'example.com')
# Success case: using user as failure-id # Success case: using user as failure-id
fr = FailRegex('^test id group: user:\(<F-ID>[^\)]+</F-ID>\)$') fr = FailRegex(r'^test id group: user:\(<F-ID>[^\)]+</F-ID>\)$')
self.assertFalse(fr.hasMatched()) self.assertFalse(fr.hasMatched())
fr.search([('test id group: user:(test login name)',"","")]) fr.search([('test id group: user:(test login name)',"","")])
self.assertTrue(fr.hasMatched()) self.assertTrue(fr.hasMatched())