From 1c4733ef89446c42a01a09c2f781d55d4303b246 Mon Sep 17 00:00:00 2001 From: sebres Date: Wed, 24 Aug 2016 18:29:12 +0200 Subject: [PATCH] [systemd] added new constructor parameters like journalpath, journalfiles and journalflags for systemd backup optimized FilterSystemd method `run`: better wait in idle (no busy-loop), better poll handling, the ban will executed anywhere (at least at 100th log-entry), also if we have never ending logging in this jail (e.g. extremely logging or too many failures) systemd test cases extended --- fail2ban/server/filtersystemd.py | 97 +++++++++++++++++++++++--------- fail2ban/tests/filtertestcase.py | 25 +++++--- 2 files changed, 88 insertions(+), 34 deletions(-) diff --git a/fail2ban/server/filtersystemd.py b/fail2ban/server/filtersystemd.py index d0ebec95..9c4c7865 100644 --- a/fail2ban/server/filtersystemd.py +++ b/fail2ban/server/filtersystemd.py @@ -33,7 +33,7 @@ if LooseVersion(getattr(journal, '__version__', "0")) < '204': from .failmanager import FailManagerEmpty from .filter import JournalFilter from .mytime import MyTime -from ..helpers import getLogger +from ..helpers import getLogger, logging, splitwords # Gets the instance of the logger. logSys = getLogger(__name__) @@ -54,14 +54,45 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover # @param jail the jail object def __init__(self, jail, **kwargs): + jrnlargs = FilterSystemd._getJournalArgs(kwargs) JournalFilter.__init__(self, jail, **kwargs) - self.__modified = False + self.__modified = 0 # Initialise systemd-journal connection - self.__journal = journal.Reader(converters={'__CURSOR': lambda x: x}) + self.__journal = journal.Reader(**jrnlargs) self.__matches = [] self.setDatePattern(None) + self.ticks = 0 logSys.debug("Created FilterSystemd") + @staticmethod + def _getJournalArgs(kwargs): + args = {'converters':{'__CURSOR': lambda x: x}} + try: + args['path'] = kwargs.pop('journalpath') + except KeyError: + pass + + try: + args['files'] = kwargs.pop('journalfiles') + except KeyError: + pass + else: + import glob + p = args['files'] + if not isinstance(p, (list, set, tuple)): + p = splitwords(p) + files = [] + for p in p: + files.extend(glob.glob(p)) + args['files'] = list(set(files)) + + try: + args['flags'] = kwargs.pop('journalflags') + except KeyError: + pass + + return args + ## # Add a journal match filters from list structure # @@ -207,6 +238,11 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover return (('', date.isoformat(), logline), time.mktime(date.timetuple()) + date.microsecond/1.0E6) + def seekToTime(self, date): + if not isinstance(date, datetime.datetime): + date = datetime.datetime.fromtimestamp(date) + self.__journal.seek_realtime(date) + ## # Main loop. # @@ -224,7 +260,7 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover # Seek to now - findtime in journal start_time = datetime.datetime.now() - \ datetime.timedelta(seconds=int(self.getFindTime())) - self.__journal.seek_realtime(start_time) + self.seekToTime(start_time) # Move back one entry to ensure do not end up in dead space # if start time beyond end of journal try: @@ -233,29 +269,38 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover pass # Reading failure, so safe to ignore while self.active: - if not self.idle: - while self.active: - try: - logentry = self.__journal.get_next() - except OSError: - logSys.warning( - "Error reading line from systemd journal") - continue - if logentry: - self.processLineAndAdd( - *self.formatJournalEntry(logentry)) - self.__modified = True - else: - break - if self.__modified: - try: - while True: - ticket = self.failManager.toBan() - self.jail.putFailTicket(ticket) - except FailManagerEmpty: - self.failManager.cleanup(MyTime.time()) - self.__modified = False + # wait for records (or for timeout in sleeptime seconds): self.__journal.wait(self.sleeptime) + if self.idle: + # because journal.wait will returns immediatelly if we have records in journal, + # just wait a little bit here for not idle, to prevent hi-load: + time.sleep(self.sleeptime) + continue + self.__modified = 0 + while self.active: + logentry = None + try: + logentry = self.__journal.get_next() + except OSError as e: + logSys.error("Error reading line from systemd journal: %s", + e, exc_info=logSys.getEffectiveLevel() <= logging.DEBUG) + self.ticks += 1 + if logentry: + self.processLineAndAdd( + *self.formatJournalEntry(logentry)) + self.__modified += 1 + if self.__modified >= 100: # todo: should be configurable + break + else: + break + if self.__modified: + try: + while True: + ticket = self.failManager.toBan() + self.jail.putFailTicket(ticket) + except FailManagerEmpty: + self.failManager.cleanup(MyTime.time()) + logSys.debug((self.jail is not None and self.jail.name or "jailless") +" filter terminated") return True diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index 40879b66..6194243b 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -707,11 +707,16 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover """Call before every test case.""" self.test_file = os.path.join(TEST_FILES_DIR, "testcase-journal.log") self.jail = DummyJail() - self.filter = Filter_(self.jail) + self.filter = None # UUID used to ensure that only meeages generated # as part of this test are picked up by the filter self.test_uuid = str(uuid.uuid4()) self.name = "monitorjournalfailures-%s" % self.test_uuid + self.journal_fields = { + 'TEST_FIELD': "1", 'TEST_UUID': self.test_uuid} + + def _initFilter(self, **kwargs): + self.filter = Filter_(self.jail, **kwargs) self.filter.addJournalMatch([ "SYSLOG_IDENTIFIER=fail2ban-testcases", "TEST_FIELD=1", @@ -720,16 +725,16 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover "SYSLOG_IDENTIFIER=fail2ban-testcases", "TEST_FIELD=2", "TEST_UUID=%s" % self.test_uuid]) - self.journal_fields = { - 'TEST_FIELD': "1", 'TEST_UUID': self.test_uuid} - self.filter.active = True self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) ") - self.filter.start() def tearDown(self): - self.filter.stop() - self.filter.join() # wait for the thread to terminate - pass + if self.filter and self.filter.active: + self.filter.stop() + self.filter.join() # wait for the thread to terminate + pass + + def testJournalFlagsArg(self): + self._initFilter(journalflags=0) # journal.RUNTIME_ONLY def __str__(self): return "MonitorJournalFailures%s(%s)" \ @@ -761,6 +766,8 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover self.assertEqual(attempts, test_attempts) def test_grow_file(self): + self._initFilter() + self.filter.start() self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) # Now let's feed it with entries from the file @@ -790,6 +797,8 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover self.assert_correct_ban("193.168.0.128", 3) def test_delJournalMatch(self): + self._initFilter() + self.filter.start() # Smoke test for removing of match # basic full test