From c7ae74ce178adb819cdb3342a248c4e30346bd33 Mon Sep 17 00:00:00 2001 From: sebres Date: Tue, 8 Feb 2022 19:10:22 +0100 Subject: [PATCH] amend to a147a8b0e1b2f32b6f191932afd3c2db9765e2e3: systemd journal test-cases - additional check appropriate default settings (if testing as not root/sudoer) --- fail2ban/tests/filtertestcase.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index 32dcca5c..f9fa5e97 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -1352,7 +1352,6 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover def setUp(self): """Call before every test case.""" super(MonitorJournalFailures, self).setUp() - self._runtimeJournal = None self.test_file = os.path.join(TEST_FILES_DIR, "testcase-journal.log") self.jail = DummyJail() self.filter = None @@ -1390,7 +1389,7 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover If not found, SkipTest exception will be raised. """ # we can cache it: - if self._runtimeJournal is None: + if not hasattr(MonitorJournalFailures, "_runtimeJournal"): # Depending on the system, it could be found under /run or /var/log (e.g. Debian) # which are pointed by different systemd-path variables. We will # check one at at time until the first hit @@ -1402,9 +1401,14 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover self.assertTrue(tmp) out = str(tmp[1].decode('utf-8')).split('\n')[0] if out: break - self._runtimeJournal = out - if self._runtimeJournal: - return self._runtimeJournal + # additional check appropriate default settings (if not root/sudoer and not already set): + if os.geteuid() != 0 and os.getenv("F2B_SYSTEMD_DEFAULT_FLAGS", None) is None: + # filter default SYSTEM_ONLY(4) is hardly usable for not root/sudoer tester, + # so back to default LOCAL_ONLY(1): + os.environ["F2B_SYSTEMD_DEFAULT_FLAGS"] = "0"; # or "1", what will be similar to journalflags=0 or ...=1 + MonitorJournalFailures._runtimeJournal = out + if MonitorJournalFailures._runtimeJournal: + return MonitorJournalFailures._runtimeJournal raise unittest.SkipTest('systemd journal seems to be not available (e. g. no rights to read)') def testJournalFilesArg(self):