debuggexURL fixed for wrong encoded character; test cases extended;

pull/1250/head
sebres 2015-11-10 12:19:46 +01:00
parent 38f09b417a
commit 689dfa1e6a
2 changed files with 37 additions and 8 deletions

View File

@ -204,7 +204,7 @@ class LineStats(object):
# just for convenient str
def __getitem__(self, key):
return getattr(self, key)
return getattr(self, key) if hasattr(self, key) else ''
class Fail2banRegex(object):
@ -240,7 +240,11 @@ class Fail2banRegex(object):
else:
self.encoding = locale.getpreferredencoding()
def decode_line(self, line):
return FileContainer.decode_line('<LOG>', self.encoding, line)
def encode_line(self, line):
return line.encode(self.encoding, 'ignore')
def setDatePattern(self, pattern):
if not self._datepattern_set:
@ -398,8 +402,6 @@ class Fail2banRegex(object):
self._filter.dateDetector.sortTemplate()
self._time_elapsed = time.time() - t0
def printLines(self, ltype):
lstats = self._line_stats
assert(self._line_stats.missed == lstats.tested - (lstats.matched + lstats.ignored))
@ -417,7 +419,8 @@ class Fail2banRegex(object):
ans = [[]]
for arg in [l, regexlist]:
ans = [ x + [y] for x in ans for y in arg ]
b = map(lambda a: a[0] + ' | ' + a[1].getFailRegex() + ' | ' + debuggexURL(a[0], a[1].getFailRegex()), ans)
b = map(lambda a: a[0] + ' | ' + a[1].getFailRegex() + ' | ' +
debuggexURL(self.encode_line(a[0]), a[1].getFailRegex()), ans)
pprint_list([x.rstrip() for x in b], header)
else:
output( "%s too many to print. Use --print-all-%s " \
@ -486,7 +489,7 @@ class Fail2banRegex(object):
def file_lines_gen(self, hdlr):
for line in hdlr:
yield FileContainer.decode_line('<LOG>', self.encoding, line)
yield self.decode_line(line)
def start(self, opts, args):
@ -507,7 +510,7 @@ class Fail2banRegex(object):
except IOError, e:
output( e )
return False
elif cmd_log == "systemd-journal":
elif cmd_log == "systemd-journal": # pragma: no cover
if not journal:
output( "Error: systemd library not found. Exiting..." )
return False

View File

@ -60,6 +60,8 @@ def _Fail2banRegex(*args):
class Fail2banRegexTest(LogCaptureTestCase):
RE_00 = r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>"
FILENAME_01 = os.path.join(TEST_FILES_DIR, "testcase01.log")
FILENAME_02 = os.path.join(TEST_FILES_DIR, "testcase02.log")
FILENAME_WRONGCHAR = os.path.join(TEST_FILES_DIR, "testcase-wrong-char.log")
@ -120,7 +122,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
(opts, args, fail2banRegex) = _Fail2banRegex(
"--print-all-matched",
Fail2banRegexTest.FILENAME_01,
r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>"
Fail2banRegexTest.RE_00
)
self.assertTrue(fail2banRegex.start(opts, args))
self.assertLogged('Lines: 19 lines, 0 ignored, 13 matched, 6 missed')
@ -135,11 +137,23 @@ class Fail2banRegexTest(LogCaptureTestCase):
(opts, args, fail2banRegex) = _Fail2banRegex(
"--print-all-matched",
Fail2banRegexTest.FILENAME_02,
r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>"
Fail2banRegexTest.RE_00
)
self.assertTrue(fail2banRegex.start(opts, args))
self.assertLogged('Lines: 13 lines, 0 ignored, 5 matched, 8 missed')
def testVerbose(self):
(opts, args, fail2banRegex) = _Fail2banRegex(
"--verbose", "--print-no-missed",
Fail2banRegexTest.FILENAME_02,
Fail2banRegexTest.RE_00
)
self.assertTrue(fail2banRegex.start(opts, args))
self.assertLogged('Lines: 13 lines, 0 ignored, 5 matched, 8 missed')
self.assertLogged('141.3.81.106 Fri Aug 14 11:53:59 2015')
self.assertLogged('141.3.81.106 Fri Aug 14 11:54:59 2015')
def testWronChar(self):
(opts, args, fail2banRegex) = _Fail2banRegex(
Fail2banRegexTest.FILENAME_WRONGCHAR, Fail2banRegexTest.FILTER_SSHD
@ -153,3 +167,15 @@ class Fail2banRegexTest(LogCaptureTestCase):
self.assertLogged('Nov 8 00:16:12 main sshd[32548]: input_userauth_request: invalid user llinco')
self.assertLogged('Nov 8 00:16:12 main sshd[32547]: pam_succeed_if(sshd:auth): error retrieving information about user llinco')
def testWronCharDebuggex(self):
(opts, args, fail2banRegex) = _Fail2banRegex(
"--debuggex", "--print-all-matched",
Fail2banRegexTest.FILENAME_WRONGCHAR, Fail2banRegexTest.FILTER_SSHD
)
self.assertTrue(fail2banRegex.start(opts, args))
self.assertLogged('Lines: 4 lines, 0 ignored, 2 matched, 2 missed')
self.assertLogged('http://')