fail2ban-regex: reimplemented log-file iterator - uses FileContainer facilities now instead of direct read from file and decode;

fail2banregextestcase.py extended to cover proper line-ending handling by interim NL char as part of multi-byte encodings (utf-16be, utf-16le)
pull/2337/merge
sebres 2021-03-24 16:19:06 +01:00
parent cbac7c176a
commit 9659033523
4 changed files with 49 additions and 15 deletions

View File

@ -289,9 +289,6 @@ class Fail2banRegex(object):
def output(self, line):
if not self._opts.out: output(line)
def decode_line(self, line):
return FileContainer.decode_line('<LOG>', self._encoding, line)
def encode_line(self, line):
return line.encode(self._encoding, 'ignore')
@ -724,8 +721,12 @@ class Fail2banRegex(object):
return True
def file_lines_gen(self, hdlr):
for line in hdlr:
yield self.decode_line(line)
while 1:
line = hdlr.readline()
if line is None:
break
yield line
hdlr.close()
def start(self, args):
@ -745,7 +746,7 @@ class Fail2banRegex(object):
if os.path.isfile(cmd_log):
try:
hdlr = open(cmd_log, 'rb')
hdlr = FileContainer(cmd_log, self._encoding, doOpen=True)
self.output( "Use log file : %s" % cmd_log )
self.output( "Use encoding : %s" % self._encoding )
test_lines = self.file_lines_gen(hdlr)

View File

@ -1278,7 +1278,7 @@ except ImportError: # pragma: no cover
class FileContainer:
def __init__(self, filename, encoding, tail=False):
def __init__(self, filename, encoding, tail=False, doOpen=False):
self.__filename = filename
self.setEncoding(encoding)
self.__tail = tail
@ -1289,6 +1289,9 @@ class FileContainer:
self.__hashNextTime = time.time() + 30
# Try to open the file. Raises an exception if an error occurred.
handler = open(filename, 'rb')
if doOpen: # fail2ban-regex only (don't need to reopen it and check for rotation)
self.__handler = handler
return
try:
stats = os.fstat(handler.fileno())
self.__ino = stats.st_ino

View File

@ -25,6 +25,7 @@ __license__ = "GPL"
import os
import sys
import tempfile
import unittest
from ..client import fail2banregex
@ -80,6 +81,11 @@ def _test_exec_command_line(*args):
sys.stderr = _org['stderr']
return _exit_code
def _reset():
# reset global warn-counter:
from ..server.filter import _decode_line_warn
_decode_line_warn.clear()
STR_00 = "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0"
STR_00_NODT = "[sshd] error: PAM: Authentication failure for kevin from 192.0.2.0"
@ -122,6 +128,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
"""Call before every test case."""
LogCaptureTestCase.setUp(self)
setUpMyTime()
_reset()
def tearDown(self):
"""Call after every test case."""
@ -454,14 +461,8 @@ class Fail2banRegexTest(LogCaptureTestCase):
FILENAME_ZZZ_GEN, FILENAME_ZZZ_GEN
))
def _reset(self):
# reset global warn-counter:
from ..server.filter import _decode_line_warn
_decode_line_warn.clear()
def testWronChar(self):
unittest.F2B.SkipIfCfgMissing(stock=True)
self._reset()
self.assertTrue(_test_exec(
"-l", "notice", # put down log-level, because of too many debug-messages
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
@ -477,7 +478,6 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testWronCharDebuggex(self):
unittest.F2B.SkipIfCfgMissing(stock=True)
self._reset()
self.assertTrue(_test_exec(
"-l", "notice", # put down log-level, because of too many debug-messages
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
@ -490,6 +490,36 @@ class Fail2banRegexTest(LogCaptureTestCase):
self.assertLogged('https://')
def testNLCharAsPartOfUniChar(self):
fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='uni')
# test two multi-byte encodings (both contains `\x0A` in either \x02\x0A or \x0A\x02):
for enc in ('utf-16be', 'utf-16le'):
self.pruneLog("[test-phase encoding=%s]" % enc)
try:
fout = open(fname, 'wb')
# test on unicode string containing \x0A as part of uni-char,
# it must produce exactly 2 lines (both are failures):
for l in (
u'1490349000 \u20AC Failed auth: invalid user Test\u020A from 192.0.2.1\n',
u'1490349000 \u20AC Failed auth: invalid user TestI from 192.0.2.2\n'
):
fout.write(l.encode(enc))
fout.close()
self.assertTrue(_test_exec(
"-l", "notice", # put down log-level, because of too many debug-messages
"--encoding", enc,
"--datepattern", r"^EPOCH",
fname, r"Failed .* from <HOST>",
))
self.assertLogged(" encoding : %s" % enc,
"Lines: 2 lines, 0 ignored, 2 matched, 0 missed", all=True)
self.assertNotLogged("Missed line(s)")
finally:
fout.close()
os.unlink(fname)
def testExecCmdLine_Usage(self):
self.assertNotEqual(_test_exec_command_line(), 0)
self.pruneLog()

View File

@ -1660,7 +1660,7 @@ class GetFailures(LogCaptureTestCase):
_killfile(fout, fname)
def testNLCharAsPartOfUniChar(self):
fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='crlf')
fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='uni')
# test two multi-byte encodings (both contains `\x0A` in either \x02\x0A or \x0A\x02):
for enc in ('utf-16be', 'utf-16le'):
self.pruneLog("[test-phase encoding=%s]" % enc)