[systemd] added new constructor parameters like journalpath, journalfiles and journalflags for systemd backup

optimized FilterSystemd method `run`: better wait in idle (no busy-loop), better poll handling, the ban will executed anywhere (at least at 100th log-entry), also if we have never ending logging in this jail (e.g. extremely logging or too many failures)
systemd test cases extended
pull/1523/head
sebres 2016-08-24 18:29:12 +02:00
parent 0ab042fcce
commit 1c4733ef89
2 changed files with 88 additions and 34 deletions

View File

@ -33,7 +33,7 @@ if LooseVersion(getattr(journal, '__version__', "0")) < '204':
from .failmanager import FailManagerEmpty from .failmanager import FailManagerEmpty
from .filter import JournalFilter from .filter import JournalFilter
from .mytime import MyTime from .mytime import MyTime
from ..helpers import getLogger from ..helpers import getLogger, logging, splitwords
# Gets the instance of the logger. # Gets the instance of the logger.
logSys = getLogger(__name__) logSys = getLogger(__name__)
@ -54,14 +54,45 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
# @param jail the jail object # @param jail the jail object
def __init__(self, jail, **kwargs): def __init__(self, jail, **kwargs):
jrnlargs = FilterSystemd._getJournalArgs(kwargs)
JournalFilter.__init__(self, jail, **kwargs) JournalFilter.__init__(self, jail, **kwargs)
self.__modified = False self.__modified = 0
# Initialise systemd-journal connection # Initialise systemd-journal connection
self.__journal = journal.Reader(converters={'__CURSOR': lambda x: x}) self.__journal = journal.Reader(**jrnlargs)
self.__matches = [] self.__matches = []
self.setDatePattern(None) self.setDatePattern(None)
self.ticks = 0
logSys.debug("Created FilterSystemd") logSys.debug("Created FilterSystemd")
@staticmethod
def _getJournalArgs(kwargs):
args = {'converters':{'__CURSOR': lambda x: x}}
try:
args['path'] = kwargs.pop('journalpath')
except KeyError:
pass
try:
args['files'] = kwargs.pop('journalfiles')
except KeyError:
pass
else:
import glob
p = args['files']
if not isinstance(p, (list, set, tuple)):
p = splitwords(p)
files = []
for p in p:
files.extend(glob.glob(p))
args['files'] = list(set(files))
try:
args['flags'] = kwargs.pop('journalflags')
except KeyError:
pass
return args
## ##
# Add a journal match filters from list structure # Add a journal match filters from list structure
# #
@ -207,6 +238,11 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
return (('', date.isoformat(), logline), return (('', date.isoformat(), logline),
time.mktime(date.timetuple()) + date.microsecond/1.0E6) time.mktime(date.timetuple()) + date.microsecond/1.0E6)
def seekToTime(self, date):
if not isinstance(date, datetime.datetime):
date = datetime.datetime.fromtimestamp(date)
self.__journal.seek_realtime(date)
## ##
# Main loop. # Main loop.
# #
@ -224,7 +260,7 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
# Seek to now - findtime in journal # Seek to now - findtime in journal
start_time = datetime.datetime.now() - \ start_time = datetime.datetime.now() - \
datetime.timedelta(seconds=int(self.getFindTime())) datetime.timedelta(seconds=int(self.getFindTime()))
self.__journal.seek_realtime(start_time) self.seekToTime(start_time)
# Move back one entry to ensure do not end up in dead space # Move back one entry to ensure do not end up in dead space
# if start time beyond end of journal # if start time beyond end of journal
try: try:
@ -233,29 +269,38 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
pass # Reading failure, so safe to ignore pass # Reading failure, so safe to ignore
while self.active: while self.active:
if not self.idle: # wait for records (or for timeout in sleeptime seconds):
while self.active:
try:
logentry = self.__journal.get_next()
except OSError:
logSys.warning(
"Error reading line from systemd journal")
continue
if logentry:
self.processLineAndAdd(
*self.formatJournalEntry(logentry))
self.__modified = True
else:
break
if self.__modified:
try:
while True:
ticket = self.failManager.toBan()
self.jail.putFailTicket(ticket)
except FailManagerEmpty:
self.failManager.cleanup(MyTime.time())
self.__modified = False
self.__journal.wait(self.sleeptime) self.__journal.wait(self.sleeptime)
if self.idle:
# because journal.wait will returns immediatelly if we have records in journal,
# just wait a little bit here for not idle, to prevent hi-load:
time.sleep(self.sleeptime)
continue
self.__modified = 0
while self.active:
logentry = None
try:
logentry = self.__journal.get_next()
except OSError as e:
logSys.error("Error reading line from systemd journal: %s",
e, exc_info=logSys.getEffectiveLevel() <= logging.DEBUG)
self.ticks += 1
if logentry:
self.processLineAndAdd(
*self.formatJournalEntry(logentry))
self.__modified += 1
if self.__modified >= 100: # todo: should be configurable
break
else:
break
if self.__modified:
try:
while True:
ticket = self.failManager.toBan()
self.jail.putFailTicket(ticket)
except FailManagerEmpty:
self.failManager.cleanup(MyTime.time())
logSys.debug((self.jail is not None and self.jail.name logSys.debug((self.jail is not None and self.jail.name
or "jailless") +" filter terminated") or "jailless") +" filter terminated")
return True return True

View File

@ -707,11 +707,16 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
"""Call before every test case.""" """Call before every test case."""
self.test_file = os.path.join(TEST_FILES_DIR, "testcase-journal.log") self.test_file = os.path.join(TEST_FILES_DIR, "testcase-journal.log")
self.jail = DummyJail() self.jail = DummyJail()
self.filter = Filter_(self.jail) self.filter = None
# UUID used to ensure that only meeages generated # UUID used to ensure that only meeages generated
# as part of this test are picked up by the filter # as part of this test are picked up by the filter
self.test_uuid = str(uuid.uuid4()) self.test_uuid = str(uuid.uuid4())
self.name = "monitorjournalfailures-%s" % self.test_uuid self.name = "monitorjournalfailures-%s" % self.test_uuid
self.journal_fields = {
'TEST_FIELD': "1", 'TEST_UUID': self.test_uuid}
def _initFilter(self, **kwargs):
self.filter = Filter_(self.jail, **kwargs)
self.filter.addJournalMatch([ self.filter.addJournalMatch([
"SYSLOG_IDENTIFIER=fail2ban-testcases", "SYSLOG_IDENTIFIER=fail2ban-testcases",
"TEST_FIELD=1", "TEST_FIELD=1",
@ -720,16 +725,16 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
"SYSLOG_IDENTIFIER=fail2ban-testcases", "SYSLOG_IDENTIFIER=fail2ban-testcases",
"TEST_FIELD=2", "TEST_FIELD=2",
"TEST_UUID=%s" % self.test_uuid]) "TEST_UUID=%s" % self.test_uuid])
self.journal_fields = {
'TEST_FIELD': "1", 'TEST_UUID': self.test_uuid}
self.filter.active = True
self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>") self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
self.filter.start()
def tearDown(self): def tearDown(self):
self.filter.stop() if self.filter and self.filter.active:
self.filter.join() # wait for the thread to terminate self.filter.stop()
pass self.filter.join() # wait for the thread to terminate
pass
def testJournalFlagsArg(self):
self._initFilter(journalflags=0) # journal.RUNTIME_ONLY
def __str__(self): def __str__(self):
return "MonitorJournalFailures%s(%s)" \ return "MonitorJournalFailures%s(%s)" \
@ -761,6 +766,8 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
self.assertEqual(attempts, test_attempts) self.assertEqual(attempts, test_attempts)
def test_grow_file(self): def test_grow_file(self):
self._initFilter()
self.filter.start()
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
# Now let's feed it with entries from the file # Now let's feed it with entries from the file
@ -790,6 +797,8 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
self.assert_correct_ban("193.168.0.128", 3) self.assert_correct_ban("193.168.0.128", 3)
def test_delJournalMatch(self): def test_delJournalMatch(self):
self._initFilter()
self.filter.start()
# Smoke test for removing of match # Smoke test for removing of match
# basic full test # basic full test