From 1c4733ef89446c42a01a09c2f781d55d4303b246 Mon Sep 17 00:00:00 2001 From: sebres Date: Wed, 24 Aug 2016 18:29:12 +0200 Subject: [PATCH 1/4] [systemd] added new constructor parameters like journalpath, journalfiles and journalflags for systemd backup optimized FilterSystemd method `run`: better wait in idle (no busy-loop), better poll handling, the ban will executed anywhere (at least at 100th log-entry), also if we have never ending logging in this jail (e.g. extremely logging or too many failures) systemd test cases extended --- fail2ban/server/filtersystemd.py | 97 +++++++++++++++++++++++--------- fail2ban/tests/filtertestcase.py | 25 +++++--- 2 files changed, 88 insertions(+), 34 deletions(-) diff --git a/fail2ban/server/filtersystemd.py b/fail2ban/server/filtersystemd.py index d0ebec95..9c4c7865 100644 --- a/fail2ban/server/filtersystemd.py +++ b/fail2ban/server/filtersystemd.py @@ -33,7 +33,7 @@ if LooseVersion(getattr(journal, '__version__', "0")) < '204': from .failmanager import FailManagerEmpty from .filter import JournalFilter from .mytime import MyTime -from ..helpers import getLogger +from ..helpers import getLogger, logging, splitwords # Gets the instance of the logger. logSys = getLogger(__name__) @@ -54,14 +54,45 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover # @param jail the jail object def __init__(self, jail, **kwargs): + jrnlargs = FilterSystemd._getJournalArgs(kwargs) JournalFilter.__init__(self, jail, **kwargs) - self.__modified = False + self.__modified = 0 # Initialise systemd-journal connection - self.__journal = journal.Reader(converters={'__CURSOR': lambda x: x}) + self.__journal = journal.Reader(**jrnlargs) self.__matches = [] self.setDatePattern(None) + self.ticks = 0 logSys.debug("Created FilterSystemd") + @staticmethod + def _getJournalArgs(kwargs): + args = {'converters':{'__CURSOR': lambda x: x}} + try: + args['path'] = kwargs.pop('journalpath') + except KeyError: + pass + + try: + args['files'] = kwargs.pop('journalfiles') + except KeyError: + pass + else: + import glob + p = args['files'] + if not isinstance(p, (list, set, tuple)): + p = splitwords(p) + files = [] + for p in p: + files.extend(glob.glob(p)) + args['files'] = list(set(files)) + + try: + args['flags'] = kwargs.pop('journalflags') + except KeyError: + pass + + return args + ## # Add a journal match filters from list structure # @@ -207,6 +238,11 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover return (('', date.isoformat(), logline), time.mktime(date.timetuple()) + date.microsecond/1.0E6) + def seekToTime(self, date): + if not isinstance(date, datetime.datetime): + date = datetime.datetime.fromtimestamp(date) + self.__journal.seek_realtime(date) + ## # Main loop. # @@ -224,7 +260,7 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover # Seek to now - findtime in journal start_time = datetime.datetime.now() - \ datetime.timedelta(seconds=int(self.getFindTime())) - self.__journal.seek_realtime(start_time) + self.seekToTime(start_time) # Move back one entry to ensure do not end up in dead space # if start time beyond end of journal try: @@ -233,29 +269,38 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover pass # Reading failure, so safe to ignore while self.active: - if not self.idle: - while self.active: - try: - logentry = self.__journal.get_next() - except OSError: - logSys.warning( - "Error reading line from systemd journal") - continue - if logentry: - self.processLineAndAdd( - *self.formatJournalEntry(logentry)) - self.__modified = True - else: - break - if self.__modified: - try: - while True: - ticket = self.failManager.toBan() - self.jail.putFailTicket(ticket) - except FailManagerEmpty: - self.failManager.cleanup(MyTime.time()) - self.__modified = False + # wait for records (or for timeout in sleeptime seconds): self.__journal.wait(self.sleeptime) + if self.idle: + # because journal.wait will returns immediatelly if we have records in journal, + # just wait a little bit here for not idle, to prevent hi-load: + time.sleep(self.sleeptime) + continue + self.__modified = 0 + while self.active: + logentry = None + try: + logentry = self.__journal.get_next() + except OSError as e: + logSys.error("Error reading line from systemd journal: %s", + e, exc_info=logSys.getEffectiveLevel() <= logging.DEBUG) + self.ticks += 1 + if logentry: + self.processLineAndAdd( + *self.formatJournalEntry(logentry)) + self.__modified += 1 + if self.__modified >= 100: # todo: should be configurable + break + else: + break + if self.__modified: + try: + while True: + ticket = self.failManager.toBan() + self.jail.putFailTicket(ticket) + except FailManagerEmpty: + self.failManager.cleanup(MyTime.time()) + logSys.debug((self.jail is not None and self.jail.name or "jailless") +" filter terminated") return True diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index 40879b66..6194243b 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -707,11 +707,16 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover """Call before every test case.""" self.test_file = os.path.join(TEST_FILES_DIR, "testcase-journal.log") self.jail = DummyJail() - self.filter = Filter_(self.jail) + self.filter = None # UUID used to ensure that only meeages generated # as part of this test are picked up by the filter self.test_uuid = str(uuid.uuid4()) self.name = "monitorjournalfailures-%s" % self.test_uuid + self.journal_fields = { + 'TEST_FIELD': "1", 'TEST_UUID': self.test_uuid} + + def _initFilter(self, **kwargs): + self.filter = Filter_(self.jail, **kwargs) self.filter.addJournalMatch([ "SYSLOG_IDENTIFIER=fail2ban-testcases", "TEST_FIELD=1", @@ -720,16 +725,16 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover "SYSLOG_IDENTIFIER=fail2ban-testcases", "TEST_FIELD=2", "TEST_UUID=%s" % self.test_uuid]) - self.journal_fields = { - 'TEST_FIELD': "1", 'TEST_UUID': self.test_uuid} - self.filter.active = True self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) ") - self.filter.start() def tearDown(self): - self.filter.stop() - self.filter.join() # wait for the thread to terminate - pass + if self.filter and self.filter.active: + self.filter.stop() + self.filter.join() # wait for the thread to terminate + pass + + def testJournalFlagsArg(self): + self._initFilter(journalflags=0) # journal.RUNTIME_ONLY def __str__(self): return "MonitorJournalFailures%s(%s)" \ @@ -761,6 +766,8 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover self.assertEqual(attempts, test_attempts) def test_grow_file(self): + self._initFilter() + self.filter.start() self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) # Now let's feed it with entries from the file @@ -790,6 +797,8 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover self.assert_correct_ban("193.168.0.128", 3) def test_delJournalMatch(self): + self._initFilter() + self.filter.start() # Smoke test for removing of match # basic full test From 7ed6cab1203c33d27a18e43c5809fbdc9e4d224a Mon Sep 17 00:00:00 2001 From: sebres Date: Wed, 24 Aug 2016 19:09:47 +0200 Subject: [PATCH 2/4] jail configuration extended with new syntax to pass options to the backend (see gh-1408), examples: - `backend = systemd[journalpath=/run/log/journal/machine-1]` - `backend = systemd[journalfiles="/run/log/journal/machine-1/system.journal, /run/log/journal/machine-1/user.journal"]` - `backend = systemd[journalflags=2]` --- ChangeLog | 5 +++++ fail2ban/server/jail.py | 28 +++++++++++++++------------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/ChangeLog b/ChangeLog index 7405749c..5f97c996 100644 --- a/ChangeLog +++ b/ChangeLog @@ -54,6 +54,11 @@ releases. * New forward compatibility method assertRaisesRegexp (normally python >= 2.7). Methods assertIn, assertNotIn, assertRaisesRegexp, assertLogged, assertNotLogged are test covered now +* Jail configuration extended with new syntax to pass options to the backend (see gh-1408), + examples: + - `backend = systemd[journalpath=/run/log/journal/machine-1]` + - `backend = systemd[journalfiles="/run/log/journal/machine-1/system.journal, /run/log/journal/machine-1/user.journal"]` + - `backend = systemd[journalflags=2]` ver. 0.9.5 (2016/07/15) - old-not-obsolete diff --git a/fail2ban/server/jail.py b/fail2ban/server/jail.py index a866cb51..951a9891 100644 --- a/fail2ban/server/jail.py +++ b/fail2ban/server/jail.py @@ -27,6 +27,7 @@ import logging import Queue from .actions import Actions +from ..client.jailreader import JailReader from ..helpers import getLogger # Gets the instance of the logger. @@ -82,6 +83,7 @@ class Jail: return "%s(%r)" % (self.__class__.__name__, self.name) def _setBackend(self, backend): + backend, beArgs = JailReader.extractOptions(backend) backend = backend.lower() # to assure consistent matching backends = self._BACKENDS @@ -98,7 +100,7 @@ class Jail: for b in backends: initmethod = getattr(self, '_init%s' % b.capitalize()) try: - initmethod() + initmethod(**beArgs) if backend != 'auto' and b != backend: logSys.warning("Could only initiated %r backend whenever " "%r was requested" % (b, backend)) @@ -117,28 +119,28 @@ class Jail: raise RuntimeError( "Failed to initialize any backend for Jail %r" % self.name) - def _initPolling(self): + def _initPolling(self, **kwargs): from filterpoll import FilterPoll - logSys.info("Jail '%s' uses poller" % self.name) - self.__filter = FilterPoll(self) + logSys.info("Jail '%s' uses poller %r" % (self.name, kwargs)) + self.__filter = FilterPoll(self, **kwargs) - def _initGamin(self): + def _initGamin(self, **kwargs): # Try to import gamin from filtergamin import FilterGamin - logSys.info("Jail '%s' uses Gamin" % self.name) - self.__filter = FilterGamin(self) + logSys.info("Jail '%s' uses Gamin %r" % (self.name, kwargs)) + self.__filter = FilterGamin(self, **kwargs) - def _initPyinotify(self): + def _initPyinotify(self, **kwargs): # Try to import pyinotify from filterpyinotify import FilterPyinotify - logSys.info("Jail '%s' uses pyinotify" % self.name) - self.__filter = FilterPyinotify(self) + logSys.info("Jail '%s' uses pyinotify %r" % (self.name, kwargs)) + self.__filter = FilterPyinotify(self, **kwargs) - def _initSystemd(self): # pragma: systemd no cover + def _initSystemd(self, **kwargs): # pragma: systemd no cover # Try to import systemd from filtersystemd import FilterSystemd - logSys.info("Jail '%s' uses systemd" % self.name) - self.__filter = FilterSystemd(self) + logSys.info("Jail '%s' uses systemd %r" % (self.name, kwargs)) + self.__filter = FilterSystemd(self, **kwargs) @property def name(self): From 35b5fea038ca76a19b40b40a162affaa9bd6874c Mon Sep 17 00:00:00 2001 From: sebres Date: Thu, 1 Sep 2016 15:44:32 +0200 Subject: [PATCH 3/4] backend "systemd" can be used as prefix now - `backend = systemd[...]` --- fail2ban/client/jailreader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fail2ban/client/jailreader.py b/fail2ban/client/jailreader.py index c86c3153..b29d12fa 100644 --- a/fail2ban/client/jailreader.py +++ b/fail2ban/client/jailreader.py @@ -192,7 +192,7 @@ class JailReader(ConfigReader): stream = [] for opt in self.__opts: if opt == "logpath" and \ - self.__opts.get('backend', None) != "systemd": + not self.__opts.get('backend', None).startswith("systemd"): found_files = 0 for path in self.__opts[opt].split("\n"): path = path.rsplit(" ", 1) From 5f35b52b9af46edab728742488e8a8afeed850ed Mon Sep 17 00:00:00 2001 From: sebres Date: Thu, 1 Sep 2016 15:47:17 +0200 Subject: [PATCH 4/4] test cases extended several test-case functionality cherry picked from 0.10 (SkipTest, with_tmpdir) --- fail2ban/tests/clientreadertestcase.py | 54 +++++++++++++++++++------- fail2ban/tests/databasetestcase.py | 4 +- fail2ban/tests/filtertestcase.py | 2 +- fail2ban/tests/servertestcase.py | 9 +---- fail2ban/tests/utils.py | 41 ++++++++++++++++++- 5 files changed, 82 insertions(+), 28 deletions(-) diff --git a/fail2ban/tests/clientreadertestcase.py b/fail2ban/tests/clientreadertestcase.py index ee362e3d..dd98374c 100644 --- a/fail2ban/tests/clientreadertestcase.py +++ b/fail2ban/tests/clientreadertestcase.py @@ -36,7 +36,7 @@ from ..client.jailsreader import JailsReader from ..client.actionreader import ActionReader from ..client.configurator import Configurator from ..version import version -from .utils import LogCaptureTestCase +from .utils import LogCaptureTestCase, with_tmpdir TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files") @@ -94,9 +94,8 @@ option = %s if not os.access(f, os.R_OK): self.assertFalse(self.c.read('d')) # should not be readable BUT present else: - # SkipTest introduced only in 2.7 thus can't yet use generally - # raise unittest.SkipTest("Skipping on %s -- access rights are not enforced" % platform) - pass + import platform + raise unittest.SkipTest("Skipping on %s -- access rights are not enforced" % platform.platform()) def testOptionalDotDDir(self): self.assertFalse(self.c.read('c')) # nothing is there yet @@ -281,8 +280,8 @@ class JailReaderTest(LogCaptureTestCase): self.assertEqual(eval(act[2][5]).get('agent', ''), useragent) self.assertEqual(act[3], ['set', 'blocklisttest', 'action', 'mynetwatchman', 'agent', useragent]) - def testGlob(self): - d = tempfile.mkdtemp(prefix="f2b-temp") + @with_tmpdir + def testGlob(self, d): # Generate few files # regular file f1 = os.path.join(d, 'f1') @@ -297,9 +296,6 @@ class JailReaderTest(LogCaptureTestCase): self.assertEqual(JailReader._glob(f2), []) self.assertLogged('File %s is a dangling link, thus cannot be monitored' % f2) self.assertEqual(JailReader._glob(os.path.join(d, 'nonexisting')), []) - os.remove(f1) - os.remove(f2) - os.rmdir(d) class FilterReaderTest(unittest.TestCase): @@ -433,10 +429,10 @@ class JailsReaderTestCache(LogCaptureTestCase): cnt += 1 return cnt - def testTestJailConfCache(self): + @with_tmpdir + def testTestJailConfCache(self, basedir): saved_ll = configparserinc.logLevel configparserinc.logLevel = logging.DEBUG - basedir = tempfile.mkdtemp("fail2ban_conf") try: shutil.rmtree(basedir) shutil.copytree(CONFIG_DIR, basedir) @@ -468,7 +464,6 @@ class JailsReaderTestCache(LogCaptureTestCase): cnt = self._getLoggedReadCount(r'action\.d/iptables-common\.conf') self.assertTrue(cnt == 1, "Unexpected count by reading of action files, cnt = %s" % cnt) finally: - shutil.rmtree(basedir) configparserinc.logLevel = saved_ll @@ -718,8 +713,8 @@ class JailsReaderTest(LogCaptureTestCase): self.assertEqual(configurator._Configurator__jails.getBaseDir(), '/tmp') self.assertEqual(configurator.getBaseDir(), CONFIG_DIR) - def testMultipleSameAction(self): - basedir = tempfile.mkdtemp("fail2ban_conf") + @with_tmpdir + def testMultipleSameAction(self, basedir): os.mkdir(os.path.join(basedir, "filter.d")) os.mkdir(os.path.join(basedir, "action.d")) open(os.path.join(basedir, "action.d", "testaction1.conf"), 'w').close() @@ -748,4 +743,33 @@ filter = testfilter1 # Python actions should not be passed `actname` self.assertEqual(add_actions[-1][-1], "{}") - shutil.rmtree(basedir) + def testLogPathFileFilterBackend(self): + self.assertRaisesRegexp(ValueError, r"Have not found any log file for .* jail", + self._testLogPath, backend='polling') + + def testLogPathSystemdBackend(self): + try: # pragma: systemd no cover + from ..server.filtersystemd import FilterSystemd + except Exception, e: # pragma: no cover + raise unittest.SkipTest("systemd python interface not available") + self._testLogPath(backend='systemd') + self._testLogPath(backend='systemd[journalflags=2]') + + @with_tmpdir + def _testLogPath(self, basedir, backend): + jailfd = open(os.path.join(basedir, "jail.conf"), 'w') + jailfd.write(""" +[testjail1] +enabled = true +backend = %s +logpath = %s/not/exist.log + /this/path/should/not/exist.log +action = +filter = +failregex = test +""" % (backend, basedir)) + jailfd.close() + jails = JailsReader(basedir=basedir) + self.assertTrue(jails.read()) + self.assertTrue(jails.getOptions()) + jails.convert() diff --git a/fail2ban/tests/databasetestcase.py b/fail2ban/tests/databasetestcase.py index e934ba45..2b599f61 100644 --- a/fail2ban/tests/databasetestcase.py +++ b/fail2ban/tests/databasetestcase.py @@ -48,12 +48,10 @@ class DatabaseTest(LogCaptureTestCase): def setUp(self): """Call before every test case.""" super(DatabaseTest, self).setUp() - if Fail2BanDb is None and sys.version_info >= (2,7): # pragma: no cover + if Fail2BanDb is None: # pragma: no cover raise unittest.SkipTest( "Unable to import fail2ban database module as sqlite is not " "available.") - elif Fail2BanDb is None: - return _, self.dbFilename = tempfile.mkstemp(".db", "fail2ban_") self.db = Fail2BanDb(self.dbFilename) diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index 6194243b..4f9d8a06 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -734,7 +734,7 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover pass def testJournalFlagsArg(self): - self._initFilter(journalflags=0) # journal.RUNTIME_ONLY + self._initFilter(journalflags=2) # journal.RUNTIME_ONLY def __str__(self): return "MonitorJournalFailures%s(%s)" \ diff --git a/fail2ban/tests/servertestcase.py b/fail2ban/tests/servertestcase.py index 21e2d784..99502794 100644 --- a/fail2ban/tests/servertestcase.py +++ b/fail2ban/tests/servertestcase.py @@ -688,10 +688,7 @@ class Transmitter(TransmitterBase): def testJournalMatch(self): if not filtersystemd: # pragma: no cover - if sys.version_info >= (2, 7): - raise unittest.SkipTest( - "systemd python interface not available") - return + raise unittest.SkipTest("systemd python interface not available") jailName = "TestJail2" self.server.addJail(jailName, "systemd") values = [ @@ -791,10 +788,8 @@ class TransmitterLogging(TransmitterBase): self.setGetTest("logtarget", "STDERR") def testLogTargetSYSLOG(self): - if not os.path.exists("/dev/log") and sys.version_info >= (2, 7): + if not os.path.exists("/dev/log"): raise unittest.SkipTest("'/dev/log' not present") - elif not os.path.exists("/dev/log"): - return self.assertTrue(self.server.getSyslogSocket(), "auto") self.setGetTest("logtarget", "SYSLOG") self.assertTrue(self.server.getSyslogSocket(), "/dev/log") diff --git a/fail2ban/tests/utils.py b/fail2ban/tests/utils.py index e091c935..05d9d5ed 100644 --- a/fail2ban/tests/utils.py +++ b/fail2ban/tests/utils.py @@ -26,10 +26,13 @@ import itertools import logging import os import re +import tempfile +import shutil import sys import time import unittest from StringIO import StringIO +from functools import wraps from ..server.mytime import MyTime from ..helpers import getLogger @@ -46,6 +49,40 @@ if not CONFIG_DIR: CONFIG_DIR = '/etc/fail2ban' +def with_tmpdir(f): + """Helper decorator to create a temporary directory + + Directory gets removed after function returns, regardless + if exception was thrown of not + """ + @wraps(f) + def wrapper(self, *args, **kwargs): + tmp = tempfile.mkdtemp(prefix="f2b-temp") + try: + return f(self, tmp, *args, **kwargs) + finally: + # clean up + shutil.rmtree(tmp) + return wrapper + + +# backwards compatibility to python 2.6: +if not hasattr(unittest, 'SkipTest'): # pragma: no cover + class SkipTest(Exception): + pass + unittest.SkipTest = SkipTest + _org_AddError = unittest._TextTestResult.addError + def addError(self, test, err): + if err[0] is SkipTest: + if self.showAll: + self.stream.writeln(str(err[1])) + elif self.dots: + self.stream.write('s') + self.stream.flush() + return + _org_AddError(self, test, err) + unittest._TextTestResult.addError = addError + def mtimesleep(): # no sleep now should be necessary since polling tracks now not only # mtime but also ino and size @@ -218,8 +255,8 @@ if not hasattr(unittest.TestCase, 'assertRaisesRegexp'): try: fun(*args, **kwargs) except exccls as e: - if re.search(regexp, e.message) is None: - self.fail('\"%s\" does not match \"%s\"' % (regexp, e.message)) + if re.search(regexp, str(e)) is None: + self.fail('\"%s\" does not match \"%s\"' % (regexp, e)) else: self.fail('%s not raised' % getattr(exccls, '__name__')) unittest.TestCase.assertRaisesRegexp = assertRaisesRegexp