mirror of https://github.com/fail2ban/fail2ban
fixed fail2ban-regex reads invalid character (in sense of given encoding); continuing to process line ignoring invalid characters (still has no test cases).
filter test cases added for same issue inside fail2ban-server / fail2ban-testcases; closes gh-1248pull/1249/head
parent
b100ee6302
commit
a42aa726ab
|
@ -84,8 +84,14 @@ def file_lines_gen(hdlr):
|
|||
try:
|
||||
line = line.decode(fail2banRegex.encoding, 'strict')
|
||||
except UnicodeDecodeError:
|
||||
if sys.version_info >= (3,): # Python 3 must be decoded
|
||||
line = line.decode(fail2banRegex.encoding, 'ignore')
|
||||
logSys.warning(
|
||||
"Error decoding line from '%s' with '%s'."
|
||||
" Consider setting logencoding=utf-8 (or another appropriate"
|
||||
" encoding) for this jail. Continuing"
|
||||
" to process line ignoring invalid characters: %r" %
|
||||
('<LOG>', fail2banRegex.encoding, line))
|
||||
# decode with replacing error chars:
|
||||
line = line.decode(fail2banRegex.encoding, 'replace')
|
||||
yield line
|
||||
|
||||
def journal_lines_gen(myjournal):
|
||||
|
|
|
@ -90,7 +90,11 @@ def _assert_equal_entries(utest, found, output, count=None):
|
|||
found_time, output_time = \
|
||||
MyTime.localtime(found[2]),\
|
||||
MyTime.localtime(output[2])
|
||||
utest.assertEqual(found_time, output_time)
|
||||
try:
|
||||
utest.assertEqual(found_time, output_time)
|
||||
except AssertionError as e:
|
||||
# assert more structured:
|
||||
utest.assertEqual((float(found[2]), found_time), (float(output[2]), output_time))
|
||||
if len(output) > 3 and count is None: # match matches
|
||||
# do not check if custom count (e.g. going through them twice)
|
||||
if os.linesep != '\n' or sys.platform.startswith('cygwin'):
|
||||
|
@ -216,6 +220,14 @@ class BasicFilter(unittest.TestCase):
|
|||
("^%Y-%m-%d-%H%M%S.%f %z",
|
||||
"^Year-Month-Day-24hourMinuteSecond.Microseconds Zone offset"))
|
||||
|
||||
def testAssertWrongTime(self):
|
||||
self.assertRaises(AssertionError,
|
||||
lambda: _assert_equal_entries(self,
|
||||
('1.1.1.1', 1, 1421262060.0),
|
||||
('1.1.1.1', 1, 1421262059.0),
|
||||
1)
|
||||
)
|
||||
|
||||
|
||||
class IgnoreIP(LogCaptureTestCase):
|
||||
|
||||
|
@ -900,6 +912,41 @@ class GetFailures(unittest.TestCase):
|
|||
except FailManagerEmpty:
|
||||
pass
|
||||
|
||||
def testGetFailuresWrongChar(self):
|
||||
# write wrong utf-8 char:
|
||||
fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='crlf')
|
||||
fout = fopen(fname, 'wb')
|
||||
try:
|
||||
# write:
|
||||
for l in (
|
||||
b'2015-01-14 20:00:58 user \"test\xf1ing\" from \"192.0.2.0\"\n', # wrong utf-8 char
|
||||
b'2015-01-14 20:00:59 user \"\xd1\xe2\xe5\xf2\xe0\" from \"192.0.2.0\"\n', # wrong utf-8 chars
|
||||
b'2015-01-14 20:01:00 user \"testing\" from \"192.0.2.0\"\n' # correct utf-8 chars
|
||||
):
|
||||
fout.write(l)
|
||||
fout.close()
|
||||
#
|
||||
output = ('192.0.2.0', 3, 1421262060.0)
|
||||
failregex = "^\s*user \"[^\"]*\" from \"<HOST>\"\s*$"
|
||||
|
||||
# encoding - auto
|
||||
self.filter.addLogPath(fname)
|
||||
self.filter.addFailRegex(failregex)
|
||||
self.filter.getFailures(fname)
|
||||
_assert_correct_last_attempt(self, self.filter, output)
|
||||
|
||||
# test direct set of encoding:
|
||||
for enc in ('utf-8', 'ascii'):
|
||||
self.tearDown();self.setUp();
|
||||
self.filter.setLogEncoding('utf-8');
|
||||
self.filter.addLogPath(fname)
|
||||
self.filter.addFailRegex(failregex)
|
||||
self.filter.getFailures(fname)
|
||||
_assert_correct_last_attempt(self, self.filter, output)
|
||||
|
||||
finally:
|
||||
_killfile(fout, fname)
|
||||
|
||||
def testGetFailuresUseDNS(self):
|
||||
# We should still catch failures with usedns = no ;-)
|
||||
output_yes = ('93.184.216.34', 2, 1124013539.0,
|
||||
|
|
Loading…
Reference in New Issue