mirror of https://github.com/fail2ban/fail2ban
filter test cases improved + log captured inside such tests + python 3.x compatibility;
changelog entry;pull/1249/head
parent
a42aa726ab
commit
46b116e86a
|
@ -24,6 +24,8 @@ ver. 0.9.4 (2015/XX/XXX) - wanna-be-released
|
||||||
* Fix jail.conf.5 man's section (gh-1226)
|
* Fix jail.conf.5 man's section (gh-1226)
|
||||||
* Fixed default banaction for allports jails like pam-generic, recidive, etc
|
* Fixed default banaction for allports jails like pam-generic, recidive, etc
|
||||||
with new default variable `banaction_allports` (gh-1216)
|
with new default variable `banaction_allports` (gh-1216)
|
||||||
|
* Fixed `fail2ban-regex` stops working on invalid (wrong encoded) character
|
||||||
|
for python version < 3.x (gh-1248)
|
||||||
|
|
||||||
- New Features:
|
- New Features:
|
||||||
* New filters:
|
* New filters:
|
||||||
|
|
|
@ -822,7 +822,7 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
|
||||||
return MonitorJournalFailures
|
return MonitorJournalFailures
|
||||||
|
|
||||||
|
|
||||||
class GetFailures(unittest.TestCase):
|
class GetFailures(LogCaptureTestCase):
|
||||||
|
|
||||||
FILENAME_01 = os.path.join(TEST_FILES_DIR, "testcase01.log")
|
FILENAME_01 = os.path.join(TEST_FILES_DIR, "testcase01.log")
|
||||||
FILENAME_02 = os.path.join(TEST_FILES_DIR, "testcase02.log")
|
FILENAME_02 = os.path.join(TEST_FILES_DIR, "testcase02.log")
|
||||||
|
@ -837,6 +837,7 @@ class GetFailures(unittest.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
"""Call before every test case."""
|
"""Call before every test case."""
|
||||||
|
LogCaptureTestCase.setUp(self)
|
||||||
setUpMyTime()
|
setUpMyTime()
|
||||||
self.jail = DummyJail()
|
self.jail = DummyJail()
|
||||||
self.filter = FileFilter(self.jail)
|
self.filter = FileFilter(self.jail)
|
||||||
|
@ -848,6 +849,7 @@ class GetFailures(unittest.TestCase):
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
"""Call after every test case."""
|
"""Call after every test case."""
|
||||||
tearDownMyTime()
|
tearDownMyTime()
|
||||||
|
LogCaptureTestCase.tearDown(self)
|
||||||
|
|
||||||
def testTail(self):
|
def testTail(self):
|
||||||
self.filter.addLogPath(GetFailures.FILENAME_01, tail=True)
|
self.filter.addLogPath(GetFailures.FILENAME_01, tail=True)
|
||||||
|
@ -929,20 +931,20 @@ class GetFailures(unittest.TestCase):
|
||||||
output = ('192.0.2.0', 3, 1421262060.0)
|
output = ('192.0.2.0', 3, 1421262060.0)
|
||||||
failregex = "^\s*user \"[^\"]*\" from \"<HOST>\"\s*$"
|
failregex = "^\s*user \"[^\"]*\" from \"<HOST>\"\s*$"
|
||||||
|
|
||||||
# encoding - auto
|
# test encoding auto or direct set of encoding:
|
||||||
self.filter.addLogPath(fname)
|
for enc in (None, 'utf-8', 'ascii'):
|
||||||
self.filter.addFailRegex(failregex)
|
if enc is not None:
|
||||||
self.filter.getFailures(fname)
|
self.tearDown();self.setUp();
|
||||||
_assert_correct_last_attempt(self, self.filter, output)
|
self.filter.setLogEncoding(enc);
|
||||||
|
self.assertNotLogged('Error decoding line');
|
||||||
# test direct set of encoding:
|
|
||||||
for enc in ('utf-8', 'ascii'):
|
|
||||||
self.tearDown();self.setUp();
|
|
||||||
self.filter.setLogEncoding('utf-8');
|
|
||||||
self.filter.addLogPath(fname)
|
self.filter.addLogPath(fname)
|
||||||
self.filter.addFailRegex(failregex)
|
self.filter.addFailRegex(failregex)
|
||||||
self.filter.getFailures(fname)
|
self.filter.getFailures(fname)
|
||||||
_assert_correct_last_attempt(self, self.filter, output)
|
_assert_correct_last_attempt(self, self.filter, output)
|
||||||
|
|
||||||
|
self.assertLogged('Error decoding line');
|
||||||
|
self.assertLogged('Continuing to process line ignoring invalid characters:', '2015-01-14 20:00:58 user ');
|
||||||
|
self.assertLogged('Continuing to process line ignoring invalid characters:', '2015-01-14 20:00:59 user ');
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
_killfile(fout, fname)
|
_killfile(fout, fname)
|
||||||
|
|
Loading…
Reference in New Issue