From 3119f8170515bf7539c3e77ffd7cea55f7212c11 Mon Sep 17 00:00:00 2001 From: sebres Date: Mon, 5 Sep 2016 19:44:32 +0200 Subject: [PATCH] fixed journal systemd ascii/utf-8 default converting (see gh-1341, gh-1344) --- fail2ban/server/filter.py | 66 ++++++++++++++---- fail2ban/server/filtersystemd.py | 82 +++++++++-------------- fail2ban/tests/files/testcase-journal.log | 8 +-- fail2ban/tests/filtertestcase.py | 50 ++++++++++++-- 4 files changed, 136 insertions(+), 70 deletions(-) diff --git a/fail2ban/server/filter.py b/fail2ban/server/filter.py index 98d0bce3..8d3f9014 100644 --- a/fail2ban/server/filter.py +++ b/fail2ban/server/filter.py @@ -24,6 +24,7 @@ __license__ = "GPL" import codecs import fcntl import locale +import logging import os import re import sys @@ -394,11 +395,35 @@ class Filter(JailThread): return False + if sys.version_info >= (3,): + @staticmethod + def uni_decode(x, enc, errors='strict'): + try: + if isinstance(x, bytes): + return x.decode(enc, errors) + return x + except (UnicodeDecodeError, UnicodeEncodeError): + if errors != 'strict': # pragma: no cover - unsure if reachable + raise + return uni_decode(x, enc, 'replace') + else: + @staticmethod + def uni_decode(x, enc, errors='strict'): + try: + if isinstance(x, unicode): + return x.encode(enc, errors) + return x + except (UnicodeDecodeError, UnicodeEncodeError): + if errors != 'strict': # pragma: no cover - unsure if reachable + raise + return uni_decode(x, enc, 'replace') + def processLine(self, line, date=None, returnRawHost=False, - checkAllRegex=False): + checkAllRegex=False, checkFindTime=False): """Split the time portion from log msg and return findFailures on them """ if date: + # be sure each element of tuple line has the same type: tupleLine = line else: l = line.rstrip('\r\n') @@ -414,7 +439,7 @@ class Filter(JailThread): tupleLine = (l, "", "") return "".join(tupleLine[::2]), self.findFailure( - tupleLine, date, returnRawHost, checkAllRegex) + tupleLine, date, returnRawHost, checkAllRegex, checkFindTime) def processLineAndAdd(self, line, date=None): """Processes the line for failures and populates failManager @@ -457,7 +482,7 @@ class Filter(JailThread): # @return a dict with IP and timestamp. def findFailure(self, tupleLine, date=None, returnRawHost=False, - checkAllRegex=False): + checkAllRegex=False, checkFindTime=False): failList = list() # Checks if we must ignore this line. @@ -489,6 +514,11 @@ class Filter(JailThread): timeText = self.__lastTimeText or "".join(tupleLine[::2]) date = self.__lastDate + if checkFindTime and date is not None and date < MyTime.time() - self.getFindTime(): + logSys.log(5, "Ignore line since time %s < %s - %s", + date, MyTime.time(), self.getFindTime()) + return failList + self.__lineBuffer = ( self.__lineBuffer + [tupleLine])[-self.__lineBufferSize:] logSys.log(5, "Looking for failregex match of %r" % self.__lineBuffer) @@ -791,17 +821,25 @@ class FileContainer: @staticmethod def decode_line(filename, enc, line): try: - line = line.decode(enc, 'strict') - except UnicodeDecodeError: - logSys.warning( - "Error decoding line from '%s' with '%s'." - " Consider setting logencoding=utf-8 (or another appropriate" - " encoding) for this jail. Continuing" - " to process line ignoring invalid characters: %r" % - (filename, enc, line)) + return line.decode(enc, 'strict') + except UnicodeDecodeError as e: # decode with replacing error chars: - line = line.decode(enc, 'replace') - return line + rline = line.decode(enc, 'replace') + except UnicodeEncodeError as e: + # encode with replacing error chars: + rline = line.decode(enc, 'replace') + global _decode_line_warn + lev = logging.DEBUG + if _decode_line_warn.get(filename, 0) <= MyTime.time(): + lev = logging.WARNING + _decode_line_warn[filename] = MyTime.time() + 24*60*60 + logSys.log(lev, + "Error decoding line from '%s' with '%s'." + " Consider setting logencoding=utf-8 (or another appropriate" + " encoding) for this jail. Continuing" + " to process line ignoring invalid characters: %r", + filename, enc, line) + return rline def readline(self): if self.__handler is None: @@ -819,6 +857,8 @@ class FileContainer: ## print "D: Closed %s with pos %d" % (handler, self.__pos) ## sys.stdout.flush() +_decode_line_warn = {} + ## # JournalFilter class. diff --git a/fail2ban/server/filtersystemd.py b/fail2ban/server/filtersystemd.py index 9c4c7865..5c5481b9 100644 --- a/fail2ban/server/filtersystemd.py +++ b/fail2ban/server/filtersystemd.py @@ -31,7 +31,7 @@ if LooseVersion(getattr(journal, '__version__', "0")) < '204': raise ImportError("Fail2Ban requires systemd >= 204") from .failmanager import FailManagerEmpty -from .filter import JournalFilter +from .filter import JournalFilter, Filter, locale from .mytime import MyTime from ..helpers import getLogger, logging, splitwords @@ -170,21 +170,10 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover def getJournalMatch(self): return self.__matches - ## - # Join group of log elements which may be a mix of bytes and strings - # - # @param elements list of strings and bytes - # @return elements joined as string - @staticmethod - def _joinStrAndBytes(elements): - strElements = [] - for element in elements: - if isinstance(element, str): - strElements.append(element) - else: - strElements.append(str(element, errors='ignore')) - return " ".join(strElements) + def uni_decode(x): + v = Filter.uni_decode(x, locale.getpreferredencoding()) + return v ## # Format journal log entry into syslog style @@ -194,48 +183,43 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover @classmethod def formatJournalEntry(cls, logentry): - logelements = [""] - if logentry.get('_HOSTNAME'): - logelements.append(logentry['_HOSTNAME']) - if logentry.get('SYSLOG_IDENTIFIER'): - logelements.append(logentry['SYSLOG_IDENTIFIER']) - if logentry.get('SYSLOG_PID'): - logelements[-1] += ("[%i]" % logentry['SYSLOG_PID']) - elif logentry.get('_PID'): - logelements[-1] += ("[%i]" % logentry['_PID']) + # Be sure, all argument of line tuple should have the same type: + uni_decode = FilterSystemd.uni_decode + logelements = [] + v = logentry.get('_HOSTNAME') + if v: + logelements.append(uni_decode(v)) + v = logentry.get('SYSLOG_IDENTIFIER') + if not v: + v = logentry.get('_COMM') + if v: + logelements.append(uni_decode(v)) + v = logentry.get('SYSLOG_PID') + if not v: + v = logentry.get('_PID') + if v: + logelements[-1] += ("[%i]" % v) logelements[-1] += ":" - elif logentry.get('_COMM'): - logelements.append(logentry['_COMM']) - if logentry.get('_PID'): - logelements[-1] += ("[%i]" % logentry['_PID']) - logelements[-1] += ":" - if logelements[-1] == "kernel:": - if '_SOURCE_MONOTONIC_TIMESTAMP' in logentry: - monotonic = logentry.get('_SOURCE_MONOTONIC_TIMESTAMP') - else: - monotonic = logentry.get('__MONOTONIC_TIMESTAMP')[0] - logelements.append("[%12.6f]" % monotonic.total_seconds()) - if isinstance(logentry.get('MESSAGE',''), list): - logelements.append(" ".join(logentry['MESSAGE'])) + if logelements[-1] == "kernel:": + if '_SOURCE_MONOTONIC_TIMESTAMP' in logentry: + monotonic = logentry.get('_SOURCE_MONOTONIC_TIMESTAMP') + else: + monotonic = logentry.get('__MONOTONIC_TIMESTAMP')[0] + logelements.append("[%12.6f]" % monotonic.total_seconds()) + msg = logentry.get('MESSAGE','') + if isinstance(msg, list): + logelements.append(" ".join(uni_decode(v) for v in msg)) else: - logelements.append(logentry.get('MESSAGE', '')) + logelements.append(uni_decode(msg)) - try: - logline = u" ".join(logelements) - except UnicodeDecodeError: - # Python 2, so treat as string - logline = " ".join([str(logline) for logline in logelements]) - except TypeError: - # Python 3, one or more elements bytes - logSys.warning("Error decoding log elements from journal: %s" % - repr(logelements)) - logline = cls._joinStrAndBytes(logelements) + logline = " ".join(logelements) date = logentry.get('_SOURCE_REALTIME_TIMESTAMP', logentry.get('__REALTIME_TIMESTAMP')) logSys.debug("Read systemd journal entry: %r" % "".join([date.isoformat(), logline])) - return (('', date.isoformat(), logline), + ## use the same type for 1st argument: + return ((logline[:0], date.isoformat(), logline), time.mktime(date.timetuple()) + date.microsecond/1.0E6) def seekToTime(self, date): diff --git a/fail2ban/tests/files/testcase-journal.log b/fail2ban/tests/files/testcase-journal.log index 720a3130..b6fa427b 100644 --- a/fail2ban/tests/files/testcase-journal.log +++ b/fail2ban/tests/files/testcase-journal.log @@ -13,7 +13,7 @@ error: PAM: Authentication failure for kevin from 193.168.0.128 error: PAM: Authentication failure for kevin from 193.168.0.128 error: PAM: Authentication failure for kevin from 193.168.0.128 error: PAM: Authentication failure for kevin from 193.168.0.128 -error: PAM: Authentication failure for kevin from 87.142.124.10 -error: PAM: Authentication failure for kevin from 87.142.124.10 -error: PAM: Authentication failure for kevin from 87.142.124.10 -error: PAM: Authentication failure for kevin from 87.142.124.10 +error: PAM: Authentication failure for göran from 87.142.124.10 +error: PAM: Authentication failure for göran from 87.142.124.10 +error: PAM: Authentication failure for göran from 87.142.124.10 +error: PAM: Authentication failure for göran from 87.142.124.10 diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index 4f9d8a06..e9971a06 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -38,7 +38,7 @@ except ImportError: from ..server.jail import Jail from ..server.filterpoll import FilterPoll -from ..server.filter import Filter, FileFilter, DNSUtils +from ..server.filter import Filter, FileFilter, FileContainer, locale, DNSUtils from ..server.failmanager import FailManagerEmpty from ..server.mytime import MyTime from .utils import setUpMyTime, tearDownMyTime, mtimesleep, LogCaptureTestCase @@ -166,6 +166,10 @@ def _copy_lines_between_files(in_, fout, n=None, skip=0, mode='a', terminal_line return fout +TEST_JOURNAL_FIELDS = { + "SYSLOG_IDENTIFIER": "fail2ban-testcases", + "PRIORITY": "7", +} def _copy_lines_to_journal(in_, fields={},n=None, skip=0, terminal_line=""): # pragma: systemd no cover """Copy lines from one file to systemd journal @@ -176,9 +180,7 @@ def _copy_lines_to_journal(in_, fields={},n=None, skip=0, terminal_line=""): # p else: fin = in_ # Required for filtering - fields.update({"SYSLOG_IDENTIFIER": "fail2ban-testcases", - "PRIORITY": "7", - }) + fields.update(TEST_JOURNAL_FIELDS) # Skip for i in xrange(skip): fin.readline() @@ -228,6 +230,19 @@ class BasicFilter(unittest.TestCase): 1) ) + def testWrongCharInTupleLine(self): + ## line tuple has different types (ascii after ascii / unicode): + for a1 in ('', u'', b''): + for a2 in ('2016-09-05T20:18:56', u'2016-09-05T20:18:56', b'2016-09-05T20:18:56'): + for a3 in ( + 'Fail for "g\xc3\xb6ran" from 192.0.2.1', + u'Fail for "g\xc3\xb6ran" from 192.0.2.1', + b'Fail for "g\xc3\xb6ran" from 192.0.2.1' + ): + # join should work if all arguments have the same type: + enc = locale.getpreferredencoding() + "".join([Filter.uni_decode(v, enc) for v in (a1, a2, a3)]) + class IgnoreIP(LogCaptureTestCase): @@ -828,6 +843,33 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover # we should detect the failures self.assertTrue(self.isFilled(6)) + def test_WrongChar(self): + self._initFilter() + self.filter.start() + # Now let's feed it with entries from the file + _copy_lines_to_journal( + self.test_file, self.journal_fields, skip=15, n=4) + self.assertTrue(self.isFilled(10)) + self.assert_correct_ban("87.142.124.10", 4) + # Add direct utf, unicode, blob: + for l in ( + "error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1", + u"error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1", + b"error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1".decode('utf-8', 'replace'), + "error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2", + u"error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2", + b"error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2".decode('utf-8', 'replace') + ): + fields = self.journal_fields + fields.update(TEST_JOURNAL_FIELDS) + journal.send(MESSAGE=l, **fields) + self.assertTrue(self.isFilled(10)) + endtm = MyTime.time()+10 + while len(self.jail) != 2 and MyTime.time() < endtm: + time.sleep(0.10) + self.assertEqual(sorted([self.jail.getFailTicket().getIP(), self.jail.getFailTicket().getIP()]), + ["192.0.2.1", "192.0.2.2"]) + return MonitorJournalFailures