fail2ban-regex: fixed matched output by multi-line (buffered) parsing + and multi-line debuggex URL;

test coverage extended;
pull/1733/head
sebres 8 years ago
parent bc888e0753
commit 990d9a66da

@ -55,11 +55,14 @@ from ..helpers import str2LogLevel, getVerbosityFormat, FormatterWithTraceBack,
# Gets the instance of the logger.
logSys = getLogger("fail2ban")
def debuggexURL(sample, regex, useDns="yes"):
q = urllib.urlencode({ 're': Regex._resolveHostTag(regex, useDns=useDns),
def debuggexURL(sample, regex, multiline=False, useDns="yes"):
args = {
're': Regex._resolveHostTag(regex, useDns=useDns),
'str': sample,
'flavor': 'python' })
return 'https://www.debuggex.com/?' + q
'flavor': 'python'
}
if multiline: args['flags'] = 'm'
return 'https://www.debuggex.com/?' + urllib.urlencode(args)
def output(args): # pragma: no cover (overriden in test-cases)
print(args)
@ -400,6 +403,7 @@ class Fail2banRegex(object):
fullBuffer = len(orgLineBuffer) >= self._filter.getMaxLines()
try:
ret = self._filter.processLine(line, date)
lines = []
line = self._filter.processedLine()
for match in ret:
# Append True/False flag depending if line was matched by
@ -422,9 +426,17 @@ class Fail2banRegex(object):
"".join(bufLine[::2])))
except ValueError:
pass
# if buffering - add also another lines from match:
if self._print_all_matched:
if not self._debuggex:
self._line_stats.matched_lines.append("".join(bufLine))
else:
lines.append(bufLine[0] + bufLine[2])
self._line_stats.matched += 1
self._line_stats.missed -= 1
if lines: # pre-lines parsed in multiline mode (buffering)
lines.append(line)
line = "\n".join(lines)
return line, ret
def process(self, test_lines):
@ -472,6 +484,7 @@ class Fail2banRegex(object):
assert(self._line_stats.missed == lstats.tested - (lstats.matched + lstats.ignored))
lines = lstats[ltype]
l = lstats[ltype + '_lines']
multiline = self._filter.getMaxLines() > 1
if lines:
header = "%s line(s):" % (ltype.capitalize(),)
if self._debuggex:
@ -485,7 +498,8 @@ class Fail2banRegex(object):
for arg in [l, regexlist]:
ans = [ x + [y] for x in ans for y in arg ]
b = map(lambda a: a[0] + ' | ' + a[1].getFailRegex() + ' | ' +
debuggexURL(self.encode_line(a[0]), a[1].getFailRegex(), self._opts.usedns), ans)
debuggexURL(self.encode_line(a[0]), a[1].getFailRegex(),
multiline, self._opts.usedns), ans)
pprint_list([x.rstrip() for x in b], header)
else:
output( "%s too many to print. Use --print-all-%s " \
@ -599,8 +613,19 @@ class Fail2banRegex(object):
output( "Use journal match : %s" % " ".join(journalmatch) )
test_lines = journal_lines_gen(flt, myjournal)
else:
output( "Use single line : %s" % shortstr(cmd_log) )
# if single line parsing (without buffering)
if self._filter.getMaxLines() <= 1:
output( "Use single line : %s" % shortstr(cmd_log.replace("\n", r"\n")) )
test_lines = [ cmd_log ]
else: # multi line parsing (with buffering)
test_lines = cmd_log.split("\n")
output( "Use multi line : %s line(s)" % len(test_lines) )
for i, l in enumerate(test_lines):
if i >= 5:
output( "| ..." ); break
output( "| %2.2s: %s" % (i+1, shortstr(l)) )
output( "`-" )
output( "" )
self.process(test_lines)

@ -252,6 +252,43 @@ class Fail2banRegexTest(LogCaptureTestCase):
)
self.assertTrue(fail2banRegex.start(args))
def testDirectMultilineBuf(self):
# test it with some pre-lines also to cover correct buffer scrolling (all multi-lines printed):
for preLines in (0, 20):
self.pruneLog("[test-phase %s]" % preLines)
(opts, args, fail2banRegex) = _Fail2banRegex(
"--usedns", "no", "-d", "^Epoch", "--print-all-matched", "--maxlines", "5",
("1490349000 TEST-NL\n"*preLines) +
"1490349000 FAIL\n1490349000 TEST1\n1490349001 TEST2\n1490349001 HOST 192.0.2.34",
r"^\s*FAIL\s*$<SKIPLINES>^\s*HOST <HOST>\s*$"
)
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: %s lines, 0 ignored, 2 matched, %s missed' % (preLines+4, preLines+2))
# both matched lines were printed:
self.assertLogged("| 1490349000 FAIL", "| 1490349001 HOST 192.0.2.34", all=True)
def testDirectMultilineBufDebuggex(self):
(opts, args, fail2banRegex) = _Fail2banRegex(
"--usedns", "no", "-d", "^Epoch", "--debuggex", "--print-all-matched", "--maxlines", "5",
"1490349000 FAIL\n1490349000 TEST1\n1490349001 TEST2\n1490349001 HOST 192.0.2.34",
r"^\s*FAIL\s*$<SKIPLINES>^\s*HOST <HOST>\s*$"
)
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: 4 lines, 0 ignored, 2 matched, 2 missed')
self.assertLogged("&flags=m")
def testSinglelineWithNLinContent(self):
#
(opts, args, fail2banRegex) = _Fail2banRegex(
"--usedns", "no", "-d", "^Epoch", "--print-all-matched",
"1490349000 FAIL: failure\nhost: 192.0.2.35",
r"^\s*FAIL:\s*.*\nhost:\s+<HOST>$"
)
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed')
def testWrongFilterFile(self):
# use test log as filter file to cover eror cases...
(opts, args, fail2banRegex) = _Fail2banRegex(

Loading…
Cancel
Save