mirror of https://github.com/fail2ban/fail2ban
Skip several test-cases of systemd backend, if journal seems to be not available (e. g. no rights to read journal);
Closes gh-2100pull/2090/head^2
parent
fd0471927d
commit
50d7c649ba
|
@ -1145,6 +1145,7 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
"""Call before every test case."""
|
"""Call before every test case."""
|
||||||
super(MonitorJournalFailures, self).setUp()
|
super(MonitorJournalFailures, self).setUp()
|
||||||
|
self._runtimeJournal = None
|
||||||
self.test_file = os.path.join(TEST_FILES_DIR, "testcase-journal.log")
|
self.test_file = os.path.join(TEST_FILES_DIR, "testcase-journal.log")
|
||||||
self.jail = DummyJail()
|
self.jail = DummyJail()
|
||||||
self.filter = None
|
self.filter = None
|
||||||
|
@ -1156,6 +1157,7 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
|
||||||
'TEST_FIELD': "1", 'TEST_UUID': self.test_uuid}
|
'TEST_FIELD': "1", 'TEST_UUID': self.test_uuid}
|
||||||
|
|
||||||
def _initFilter(self, **kwargs):
|
def _initFilter(self, **kwargs):
|
||||||
|
self._getRuntimeJournal() # check journal available
|
||||||
self.filter = Filter_(self.jail, **kwargs)
|
self.filter = Filter_(self.jail, **kwargs)
|
||||||
self.filter.addJournalMatch([
|
self.filter.addJournalMatch([
|
||||||
"SYSLOG_IDENTIFIER=fail2ban-testcases",
|
"SYSLOG_IDENTIFIER=fail2ban-testcases",
|
||||||
|
@ -1176,21 +1178,26 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
|
||||||
def _getRuntimeJournal(self):
|
def _getRuntimeJournal(self):
|
||||||
"""Retrieve current system journal path
|
"""Retrieve current system journal path
|
||||||
|
|
||||||
If none found, None will be returned
|
If not found, SkipTest exception will be raised.
|
||||||
"""
|
"""
|
||||||
# Depending on the system, it could be found under /run or /var/log (e.g. Debian)
|
# we can cache it:
|
||||||
# which are pointed by different systemd-path variables. We will
|
if self._runtimeJournal is None:
|
||||||
# check one at at time until the first hit
|
# Depending on the system, it could be found under /run or /var/log (e.g. Debian)
|
||||||
for systemd_var in 'system-runtime-logs', 'system-state-logs':
|
# which are pointed by different systemd-path variables. We will
|
||||||
tmp = Utils.executeCmd(
|
# check one at at time until the first hit
|
||||||
'find "$(systemd-path %s)" -name system.journal' % systemd_var,
|
for systemd_var in 'system-runtime-logs', 'system-state-logs':
|
||||||
timeout=10, shell=True, output=True
|
tmp = Utils.executeCmd(
|
||||||
)
|
'find "$(systemd-path %s)" -name system.journal' % systemd_var,
|
||||||
self.assertTrue(tmp)
|
timeout=10, shell=True, output=True
|
||||||
out = str(tmp[1].decode('utf-8')).split('\n')[0]
|
)
|
||||||
if out:
|
self.assertTrue(tmp)
|
||||||
return out
|
out = str(tmp[1].decode('utf-8')).split('\n')[0]
|
||||||
|
if out: break
|
||||||
|
self._runtimeJournal = out
|
||||||
|
if self._runtimeJournal:
|
||||||
|
return self._runtimeJournal
|
||||||
|
raise unittest.SkipTest('systemd journal seems to be not available (e. g. no rights to read)')
|
||||||
|
|
||||||
def testJournalFilesArg(self):
|
def testJournalFilesArg(self):
|
||||||
# retrieve current system journal path
|
# retrieve current system journal path
|
||||||
jrnlfile = self._getRuntimeJournal()
|
jrnlfile = self._getRuntimeJournal()
|
||||||
|
|
Loading…
Reference in New Issue