From d974ebd5814b8703a6776a1df1b6fcb2bec048b9 Mon Sep 17 00:00:00 2001 From: sebres Date: Thu, 1 Sep 2016 19:38:40 +0200 Subject: [PATCH] rewritten idle handling for all filters, test cases extended to work in idle mode also (should not find/process failures) --- fail2ban/server/filter.py | 2 ++ fail2ban/server/filtergamin.py | 12 +++++-- fail2ban/server/filterpoll.py | 7 +++-- fail2ban/server/filterpyinotify.py | 50 ++++++++++++++++-------------- fail2ban/server/filtersystemd.py | 3 +- fail2ban/tests/filtertestcase.py | 40 ++++++++++++++++++++---- fail2ban/tests/utils.py | 8 ++--- 7 files changed, 83 insertions(+), 39 deletions(-) diff --git a/fail2ban/server/filter.py b/fail2ban/server/filter.py index 318d8e49..4f30719d 100644 --- a/fail2ban/server/filter.py +++ b/fail2ban/server/filter.py @@ -85,6 +85,8 @@ class Filter(JailThread): self.__lastDate = None ## External command self.__ignoreCommand = False + ## Ticks counter + self.ticks = 0 self.dateDetector = DateDetector() self.dateDetector.addDefaultTemplate() diff --git a/fail2ban/server/filtergamin.py b/fail2ban/server/filtergamin.py index 6b562a5c..39793067 100644 --- a/fail2ban/server/filtergamin.py +++ b/fail2ban/server/filtergamin.py @@ -69,6 +69,7 @@ class FilterGamin(FileFilter): logSys.debug("File changed: " + path) self.__modified = True + self.ticks += 1 self._process_file(path) def _process_file(self, path): @@ -105,7 +106,7 @@ class FilterGamin(FileFilter): def _handleEvents(self): ret = False mon = self.monitor - while mon and mon.event_pending(): + while mon and mon.event_pending() > 0: mon.handle_events() mon = self.monitor ret = True @@ -122,9 +123,14 @@ class FilterGamin(FileFilter): # Gamin needs a loop to collect and dispatch events while self.active: if self.idle: - time.sleep(self.sleeptime) - continue + # wait a little bit here for not idle, to prevent hi-load: + if not Utils.wait_for(lambda: not self.idle, + self.sleeptime * 10, self.sleeptime + ): + self.ticks += 1 + continue Utils.wait_for(self._handleEvents, self.sleeptime) + self.ticks += 1 logSys.debug(self.jail.name + ": filter terminated") return True diff --git a/fail2ban/server/filterpoll.py b/fail2ban/server/filterpoll.py index c7b04970..bebdcad2 100644 --- a/fail2ban/server/filterpoll.py +++ b/fail2ban/server/filterpoll.py @@ -30,8 +30,9 @@ import time from .failmanager import FailManagerEmpty from .filter import FileFilter from .mytime import MyTime +from .utils import Utils from ..helpers import getLogger -from ..server.utils import Utils + # Gets the instance of the logger. logSys = getLogger(__name__) @@ -101,8 +102,9 @@ class FilterPoll(FileFilter): self.idle, self.getLogCount()) if self.idle: if not Utils.wait_for(lambda: not self.idle, - self.sleeptime * 100, self.sleeptime + self.sleeptime * 10, self.sleeptime ): + self.ticks += 1 continue # Get file modification modlst = [] @@ -111,6 +113,7 @@ class FilterPoll(FileFilter): self.getFailures(filename) self.__modified = True + self.ticks += 1 if self.__modified: try: while True: diff --git a/fail2ban/server/filterpyinotify.py b/fail2ban/server/filterpyinotify.py index 7bd638d9..2116cb92 100644 --- a/fail2ban/server/filterpyinotify.py +++ b/fail2ban/server/filterpyinotify.py @@ -32,6 +32,7 @@ import pyinotify from .failmanager import FailManagerEmpty from .filter import FileFilter from .mytime import MyTime +from .utils import Utils from ..helpers import getLogger @@ -92,7 +93,9 @@ class FilterPyinotify(FileFilter): self._delFileWatcher(path) # place a new one self._addFileWatcher(path) - + # do nothing if idle: + if self.idle: + return self._process_file(path) def _process_file(self, path): @@ -159,6 +162,25 @@ class FilterPyinotify(FileFilter): self.__monitor.rm_watch(wdInt) logSys.debug("Removed monitor for the parent directory %s", path_dir) + # pyinotify.ProcessEvent default handler: + def __process_default(self, event): + try: + self.callback(event, origin='Default ') + except Exception as e: + logSys.error("Error in FilterPyinotify callback: %s", + e, exc_info=logSys.getEffectiveLevel() <= logging.DEBUG) + self.ticks += 1 + + # slow check events while idle: + def __check_events(self, *args, **kwargs): + if self.idle: + if Utils.wait_for(lambda: not self.idle, + self.sleeptime * 10, self.sleeptime + ): + pass + self.ticks += 1 + return pyinotify.ThreadedNotifier.check_events(self.__notifier, *args, **kwargs) + ## # Main loop. # @@ -166,12 +188,13 @@ class FilterPyinotify(FileFilter): # loop is necessary def run(self): + prcevent = pyinotify.ProcessEvent() + prcevent.process_default = self.__process_default self.__notifier = pyinotify.ThreadedNotifier(self.__monitor, - ProcessPyinotify(self)) + prcevent, timeout=self.sleeptime) + self.__notifier.check_events = self.__check_events self.__notifier.start() logSys.debug("pyinotifier started for %s.", self.jail.name) - # TODO: verify that there is nothing really to be done for - # idle jails return True ## @@ -191,22 +214,3 @@ class FilterPyinotify(FileFilter): def __cleanup(self): self.__notifier = None self.__monitor = None - - -class ProcessPyinotify(pyinotify.ProcessEvent): - def __init__(self, FileFilter, **kargs): - #super(ProcessPyinotify, self).__init__(**kargs) - # for some reason root class _ProcessEvent is old-style (is - # not derived from object), so to play safe let's avoid super - # for now, and call superclass directly - pyinotify.ProcessEvent.__init__(self, **kargs) - self.__FileFilter = FileFilter - pass - - # just need default, since using mask on watch to limit events - def process_default(self, event): - try: - self.__FileFilter.callback(event, origin='Default ') - except Exception as e: - logSys.error("Error in FilterPyinotify callback: %s", - e, exc_info=logSys.getEffectiveLevel() <= logging.DEBUG) diff --git a/fail2ban/server/filtersystemd.py b/fail2ban/server/filtersystemd.py index 2ab2f6b4..3d41dce9 100644 --- a/fail2ban/server/filtersystemd.py +++ b/fail2ban/server/filtersystemd.py @@ -33,6 +33,7 @@ if LooseVersion(getattr(journal, '__version__', "0")) < '204': from .failmanager import FailManagerEmpty from .filter import JournalFilter from .mytime import MyTime +from .utils import Utils from ..helpers import getLogger, logging, splitwords # Gets the instance of the logger. @@ -61,7 +62,6 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover self.__journal = journal.Reader(**jrnlargs) self.__matches = [] self.setDatePattern(None) - self.ticks = 0 logSys.debug("Created FilterSystemd") @staticmethod @@ -277,6 +277,7 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover if not Utils.wait_for(lambda: not self.idle, self.sleeptime * 10, self.sleeptime ): + self.ticks += 1 continue self.__modified = 0 while self.active: diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index 40d37bb6..c4801282 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -720,6 +720,12 @@ class CommonMonitorTestCase(unittest.TestCase): """ return Utils.wait_for(self.jail.isEmpty, _maxWaitTime(delay)) + def waitForTicks(self, ticks, delay=2.): + """Wait up to `delay` sec to assure that it was modified or not + """ + last_ticks = self.filter.ticks + return Utils.wait_for(lambda: self.filter.ticks >= last_ticks + ticks, _maxWaitTime(delay)) + def get_monitor_failures_testcase(Filter_): """Generator of TestCase's for different filters/backends @@ -776,6 +782,16 @@ def get_monitor_failures_testcase(Filter_): _assert_correct_last_attempt(self, self.jail, failures, count=count) def test_grow_file(self): + self._test_grow_file() + + def test_grow_file_in_idle(self): + self._test_grow_file(True) + + def _test_grow_file(self, idle=False): + if idle: + self.filter.sleeptime /= 100.0 + self.filter.idle = True + self.waitForTicks(1) # suck in lines from this sample log file self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) @@ -787,6 +803,10 @@ def get_monitor_failures_testcase(Filter_): # since it should have not been enough _copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=5) + if idle: + self.waitForTicks(1) + self.assertTrue(self.isEmpty(1)) + return self.assertTrue(self.isFilled(10)) # so we sleep a bit for it not to become empty, # and meanwhile pass to other thread(s) and filter should @@ -958,12 +978,6 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover self.filter.join() # wait for the thread to terminate pass - def waitForTicks(self, ticks, delay=2.): - """Wait up to `delay` sec to assure that it was modified or not - """ - last_ticks = self.filter.ticks - return Utils.wait_for(lambda: self.filter.ticks >= last_ticks + ticks, _maxWaitTime(delay)) - def _getRuntimeJournal(self): # retrieve current system journal path tmp = Utils.executeCmd('find "$(systemd-path system-runtime-logs)" -name system.journal', @@ -1006,8 +1020,18 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover self.assertEqual(attempts, test_attempts) def test_grow_file(self): + self._test_grow_file() + + def test_grow_file_in_idle(self): + self._test_grow_file(True) + + def _test_grow_file(self, idle=False): self._initFilter() self.filter.start() + if idle: + self.filter.sleeptime /= 100.0 + self.filter.idle = True + self.waitForTicks(1) self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) # Now let's feed it with entries from the file @@ -1020,6 +1044,10 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover _copy_lines_to_journal( self.test_file, self.journal_fields, skip=2, n=3) + if idle: + self.waitForTicks(1) + self.assertTrue(self.isEmpty(1)) + return self.assertTrue(self.isFilled(10)) # so we sleep for up to 6 sec for it not to become empty, # and meanwhile pass to other thread(s) and filter should diff --git a/fail2ban/tests/utils.py b/fail2ban/tests/utils.py index 6d0828d4..4cbf4354 100644 --- a/fail2ban/tests/utils.py +++ b/fail2ban/tests/utils.py @@ -430,16 +430,16 @@ def gatherTests(regexps=None, opts=None): # because gamin can be very slow on some platforms (and can produce many failures # with fast sleep interval) - skip it by fast run: if unittest.F2B.fast or unittest.F2B.no_gamin: # pragma: no cover - raise Exception('Skip, fast: %s, no_gamin: %s' % (unittest.F2B.fast, unittest.F2B.no_gamin)) + raise ImportError('Skip, fast: %s, no_gamin: %s' % (unittest.F2B.fast, unittest.F2B.no_gamin)) from ..server.filtergamin import FilterGamin filters.append(FilterGamin) - except Exception, e: # pragma: no cover + except ImportError, e: # pragma: no cover logSys.warning("Skipping gamin backend testing. Got exception '%s'" % e) try: from ..server.filterpyinotify import FilterPyinotify filters.append(FilterPyinotify) - except Exception, e: # pragma: no cover + except ImportError, e: # pragma: no cover logSys.warning("I: Skipping pyinotify backend testing. Got exception '%s'" % e) for Filter_ in filters: @@ -448,7 +448,7 @@ def gatherTests(regexps=None, opts=None): try: # pragma: systemd no cover from ..server.filtersystemd import FilterSystemd tests.addTest(unittest.makeSuite(filtertestcase.get_monitor_failures_journal_testcase(FilterSystemd))) - except Exception, e: # pragma: no cover + except ImportError, e: # pragma: no cover logSys.warning("I: Skipping systemd backend testing. Got exception '%s'" % e) # Server test for logging elements which break logging used to support