diff --git a/bin/fail2ban-regex b/bin/fail2ban-regex index b7b0579f..fdbfcb7a 100755 --- a/bin/fail2ban-regex +++ b/bin/fail2ban-regex @@ -84,8 +84,14 @@ def file_lines_gen(hdlr): try: line = line.decode(fail2banRegex.encoding, 'strict') except UnicodeDecodeError: - if sys.version_info >= (3,): # Python 3 must be decoded - line = line.decode(fail2banRegex.encoding, 'ignore') + logSys.warning( + "Error decoding line from '%s' with '%s'." + " Consider setting logencoding=utf-8 (or another appropriate" + " encoding) for this jail. Continuing" + " to process line ignoring invalid characters: %r" % + ('', fail2banRegex.encoding, line)) + # decode with replacing error chars: + line = line.decode(fail2banRegex.encoding, 'replace') yield line def journal_lines_gen(myjournal): diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index acf15528..b15494f5 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -90,7 +90,11 @@ def _assert_equal_entries(utest, found, output, count=None): found_time, output_time = \ MyTime.localtime(found[2]),\ MyTime.localtime(output[2]) - utest.assertEqual(found_time, output_time) + try: + utest.assertEqual(found_time, output_time) + except AssertionError as e: + # assert more structured: + utest.assertEqual((float(found[2]), found_time), (float(output[2]), output_time)) if len(output) > 3 and count is None: # match matches # do not check if custom count (e.g. going through them twice) if os.linesep != '\n' or sys.platform.startswith('cygwin'): @@ -216,6 +220,14 @@ class BasicFilter(unittest.TestCase): ("^%Y-%m-%d-%H%M%S.%f %z", "^Year-Month-Day-24hourMinuteSecond.Microseconds Zone offset")) + def testAssertWrongTime(self): + self.assertRaises(AssertionError, + lambda: _assert_equal_entries(self, + ('1.1.1.1', 1, 1421262060.0), + ('1.1.1.1', 1, 1421262059.0), + 1) + ) + class IgnoreIP(LogCaptureTestCase): @@ -900,6 +912,41 @@ class GetFailures(unittest.TestCase): except FailManagerEmpty: pass + def testGetFailuresWrongChar(self): + # write wrong utf-8 char: + fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='crlf') + fout = fopen(fname, 'wb') + try: + # write: + for l in ( + b'2015-01-14 20:00:58 user \"test\xf1ing\" from \"192.0.2.0\"\n', # wrong utf-8 char + b'2015-01-14 20:00:59 user \"\xd1\xe2\xe5\xf2\xe0\" from \"192.0.2.0\"\n', # wrong utf-8 chars + b'2015-01-14 20:01:00 user \"testing\" from \"192.0.2.0\"\n' # correct utf-8 chars + ): + fout.write(l) + fout.close() + # + output = ('192.0.2.0', 3, 1421262060.0) + failregex = "^\s*user \"[^\"]*\" from \"\"\s*$" + + # encoding - auto + self.filter.addLogPath(fname) + self.filter.addFailRegex(failregex) + self.filter.getFailures(fname) + _assert_correct_last_attempt(self, self.filter, output) + + # test direct set of encoding: + for enc in ('utf-8', 'ascii'): + self.tearDown();self.setUp(); + self.filter.setLogEncoding('utf-8'); + self.filter.addLogPath(fname) + self.filter.addFailRegex(failregex) + self.filter.getFailures(fname) + _assert_correct_last_attempt(self, self.filter, output) + + finally: + _killfile(fout, fname) + def testGetFailuresUseDNS(self): # We should still catch failures with usedns = no ;-) output_yes = ('93.184.216.34', 2, 1124013539.0,