fixed journal systemd ascii/utf-8 default converting (see gh-1341, gh-1344)

pull/1542/head
sebres 2016-09-05 19:44:32 +02:00
parent f6258c7b69
commit 3119f81705
4 changed files with 136 additions and 70 deletions

View File

@ -24,6 +24,7 @@ __license__ = "GPL"
import codecs
import fcntl
import locale
import logging
import os
import re
import sys
@ -394,11 +395,35 @@ class Filter(JailThread):
return False
if sys.version_info >= (3,):
@staticmethod
def uni_decode(x, enc, errors='strict'):
try:
if isinstance(x, bytes):
return x.decode(enc, errors)
return x
except (UnicodeDecodeError, UnicodeEncodeError):
if errors != 'strict': # pragma: no cover - unsure if reachable
raise
return uni_decode(x, enc, 'replace')
else:
@staticmethod
def uni_decode(x, enc, errors='strict'):
try:
if isinstance(x, unicode):
return x.encode(enc, errors)
return x
except (UnicodeDecodeError, UnicodeEncodeError):
if errors != 'strict': # pragma: no cover - unsure if reachable
raise
return uni_decode(x, enc, 'replace')
def processLine(self, line, date=None, returnRawHost=False,
checkAllRegex=False):
checkAllRegex=False, checkFindTime=False):
"""Split the time portion from log msg and return findFailures on them
"""
if date:
# be sure each element of tuple line has the same type:
tupleLine = line
else:
l = line.rstrip('\r\n')
@ -414,7 +439,7 @@ class Filter(JailThread):
tupleLine = (l, "", "")
return "".join(tupleLine[::2]), self.findFailure(
tupleLine, date, returnRawHost, checkAllRegex)
tupleLine, date, returnRawHost, checkAllRegex, checkFindTime)
def processLineAndAdd(self, line, date=None):
"""Processes the line for failures and populates failManager
@ -457,7 +482,7 @@ class Filter(JailThread):
# @return a dict with IP and timestamp.
def findFailure(self, tupleLine, date=None, returnRawHost=False,
checkAllRegex=False):
checkAllRegex=False, checkFindTime=False):
failList = list()
# Checks if we must ignore this line.
@ -489,6 +514,11 @@ class Filter(JailThread):
timeText = self.__lastTimeText or "".join(tupleLine[::2])
date = self.__lastDate
if checkFindTime and date is not None and date < MyTime.time() - self.getFindTime():
logSys.log(5, "Ignore line since time %s < %s - %s",
date, MyTime.time(), self.getFindTime())
return failList
self.__lineBuffer = (
self.__lineBuffer + [tupleLine])[-self.__lineBufferSize:]
logSys.log(5, "Looking for failregex match of %r" % self.__lineBuffer)
@ -791,17 +821,25 @@ class FileContainer:
@staticmethod
def decode_line(filename, enc, line):
try:
line = line.decode(enc, 'strict')
except UnicodeDecodeError:
logSys.warning(
return line.decode(enc, 'strict')
except UnicodeDecodeError as e:
# decode with replacing error chars:
rline = line.decode(enc, 'replace')
except UnicodeEncodeError as e:
# encode with replacing error chars:
rline = line.decode(enc, 'replace')
global _decode_line_warn
lev = logging.DEBUG
if _decode_line_warn.get(filename, 0) <= MyTime.time():
lev = logging.WARNING
_decode_line_warn[filename] = MyTime.time() + 24*60*60
logSys.log(lev,
"Error decoding line from '%s' with '%s'."
" Consider setting logencoding=utf-8 (or another appropriate"
" encoding) for this jail. Continuing"
" to process line ignoring invalid characters: %r" %
(filename, enc, line))
# decode with replacing error chars:
line = line.decode(enc, 'replace')
return line
" to process line ignoring invalid characters: %r",
filename, enc, line)
return rline
def readline(self):
if self.__handler is None:
@ -819,6 +857,8 @@ class FileContainer:
## print "D: Closed %s with pos %d" % (handler, self.__pos)
## sys.stdout.flush()
_decode_line_warn = {}
##
# JournalFilter class.

View File

@ -31,7 +31,7 @@ if LooseVersion(getattr(journal, '__version__', "0")) < '204':
raise ImportError("Fail2Ban requires systemd >= 204")
from .failmanager import FailManagerEmpty
from .filter import JournalFilter
from .filter import JournalFilter, Filter, locale
from .mytime import MyTime
from ..helpers import getLogger, logging, splitwords
@ -170,21 +170,10 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
def getJournalMatch(self):
return self.__matches
##
# Join group of log elements which may be a mix of bytes and strings
#
# @param elements list of strings and bytes
# @return elements joined as string
@staticmethod
def _joinStrAndBytes(elements):
strElements = []
for element in elements:
if isinstance(element, str):
strElements.append(element)
else:
strElements.append(str(element, errors='ignore'))
return " ".join(strElements)
def uni_decode(x):
v = Filter.uni_decode(x, locale.getpreferredencoding())
return v
##
# Format journal log entry into syslog style
@ -194,20 +183,22 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
@classmethod
def formatJournalEntry(cls, logentry):
logelements = [""]
if logentry.get('_HOSTNAME'):
logelements.append(logentry['_HOSTNAME'])
if logentry.get('SYSLOG_IDENTIFIER'):
logelements.append(logentry['SYSLOG_IDENTIFIER'])
if logentry.get('SYSLOG_PID'):
logelements[-1] += ("[%i]" % logentry['SYSLOG_PID'])
elif logentry.get('_PID'):
logelements[-1] += ("[%i]" % logentry['_PID'])
logelements[-1] += ":"
elif logentry.get('_COMM'):
logelements.append(logentry['_COMM'])
if logentry.get('_PID'):
logelements[-1] += ("[%i]" % logentry['_PID'])
# Be sure, all argument of line tuple should have the same type:
uni_decode = FilterSystemd.uni_decode
logelements = []
v = logentry.get('_HOSTNAME')
if v:
logelements.append(uni_decode(v))
v = logentry.get('SYSLOG_IDENTIFIER')
if not v:
v = logentry.get('_COMM')
if v:
logelements.append(uni_decode(v))
v = logentry.get('SYSLOG_PID')
if not v:
v = logentry.get('_PID')
if v:
logelements[-1] += ("[%i]" % v)
logelements[-1] += ":"
if logelements[-1] == "kernel:":
if '_SOURCE_MONOTONIC_TIMESTAMP' in logentry:
@ -215,27 +206,20 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
else:
monotonic = logentry.get('__MONOTONIC_TIMESTAMP')[0]
logelements.append("[%12.6f]" % monotonic.total_seconds())
if isinstance(logentry.get('MESSAGE',''), list):
logelements.append(" ".join(logentry['MESSAGE']))
msg = logentry.get('MESSAGE','')
if isinstance(msg, list):
logelements.append(" ".join(uni_decode(v) for v in msg))
else:
logelements.append(logentry.get('MESSAGE', ''))
logelements.append(uni_decode(msg))
try:
logline = u" ".join(logelements)
except UnicodeDecodeError:
# Python 2, so treat as string
logline = " ".join([str(logline) for logline in logelements])
except TypeError:
# Python 3, one or more elements bytes
logSys.warning("Error decoding log elements from journal: %s" %
repr(logelements))
logline = cls._joinStrAndBytes(logelements)
logline = " ".join(logelements)
date = logentry.get('_SOURCE_REALTIME_TIMESTAMP',
logentry.get('__REALTIME_TIMESTAMP'))
logSys.debug("Read systemd journal entry: %r" %
"".join([date.isoformat(), logline]))
return (('', date.isoformat(), logline),
## use the same type for 1st argument:
return ((logline[:0], date.isoformat(), logline),
time.mktime(date.timetuple()) + date.microsecond/1.0E6)
def seekToTime(self, date):

View File

@ -13,7 +13,7 @@ error: PAM: Authentication failure for kevin from 193.168.0.128
error: PAM: Authentication failure for kevin from 193.168.0.128
error: PAM: Authentication failure for kevin from 193.168.0.128
error: PAM: Authentication failure for kevin from 193.168.0.128
error: PAM: Authentication failure for kevin from 87.142.124.10
error: PAM: Authentication failure for kevin from 87.142.124.10
error: PAM: Authentication failure for kevin from 87.142.124.10
error: PAM: Authentication failure for kevin from 87.142.124.10
error: PAM: Authentication failure for göran from 87.142.124.10
error: PAM: Authentication failure for göran from 87.142.124.10
error: PAM: Authentication failure for göran from 87.142.124.10
error: PAM: Authentication failure for göran from 87.142.124.10

View File

@ -38,7 +38,7 @@ except ImportError:
from ..server.jail import Jail
from ..server.filterpoll import FilterPoll
from ..server.filter import Filter, FileFilter, DNSUtils
from ..server.filter import Filter, FileFilter, FileContainer, locale, DNSUtils
from ..server.failmanager import FailManagerEmpty
from ..server.mytime import MyTime
from .utils import setUpMyTime, tearDownMyTime, mtimesleep, LogCaptureTestCase
@ -166,6 +166,10 @@ def _copy_lines_between_files(in_, fout, n=None, skip=0, mode='a', terminal_line
return fout
TEST_JOURNAL_FIELDS = {
"SYSLOG_IDENTIFIER": "fail2ban-testcases",
"PRIORITY": "7",
}
def _copy_lines_to_journal(in_, fields={},n=None, skip=0, terminal_line=""): # pragma: systemd no cover
"""Copy lines from one file to systemd journal
@ -176,9 +180,7 @@ def _copy_lines_to_journal(in_, fields={},n=None, skip=0, terminal_line=""): # p
else:
fin = in_
# Required for filtering
fields.update({"SYSLOG_IDENTIFIER": "fail2ban-testcases",
"PRIORITY": "7",
})
fields.update(TEST_JOURNAL_FIELDS)
# Skip
for i in xrange(skip):
fin.readline()
@ -228,6 +230,19 @@ class BasicFilter(unittest.TestCase):
1)
)
def testWrongCharInTupleLine(self):
## line tuple has different types (ascii after ascii / unicode):
for a1 in ('', u'', b''):
for a2 in ('2016-09-05T20:18:56', u'2016-09-05T20:18:56', b'2016-09-05T20:18:56'):
for a3 in (
'Fail for "g\xc3\xb6ran" from 192.0.2.1',
u'Fail for "g\xc3\xb6ran" from 192.0.2.1',
b'Fail for "g\xc3\xb6ran" from 192.0.2.1'
):
# join should work if all arguments have the same type:
enc = locale.getpreferredencoding()
"".join([Filter.uni_decode(v, enc) for v in (a1, a2, a3)])
class IgnoreIP(LogCaptureTestCase):
@ -828,6 +843,33 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
# we should detect the failures
self.assertTrue(self.isFilled(6))
def test_WrongChar(self):
self._initFilter()
self.filter.start()
# Now let's feed it with entries from the file
_copy_lines_to_journal(
self.test_file, self.journal_fields, skip=15, n=4)
self.assertTrue(self.isFilled(10))
self.assert_correct_ban("87.142.124.10", 4)
# Add direct utf, unicode, blob:
for l in (
"error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1",
u"error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1",
b"error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1".decode('utf-8', 'replace'),
"error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2",
u"error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2",
b"error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2".decode('utf-8', 'replace')
):
fields = self.journal_fields
fields.update(TEST_JOURNAL_FIELDS)
journal.send(MESSAGE=l, **fields)
self.assertTrue(self.isFilled(10))
endtm = MyTime.time()+10
while len(self.jail) != 2 and MyTime.time() < endtm:
time.sleep(0.10)
self.assertEqual(sorted([self.jail.getFailTicket().getIP(), self.jail.getFailTicket().getIP()]),
["192.0.2.1", "192.0.2.2"])
return MonitorJournalFailures