From 191d1e953340a20310bcfdb1e02e4dfc683b9f07 Mon Sep 17 00:00:00 2001 From: sebres Date: Sun, 30 Mar 2025 05:52:42 +0200 Subject: [PATCH] improve threaded clean-up of filters, new functions `afterStop` (to force clean-up after stop) and `done`, invoking `afterStop` once; ensure journal-reader is always closed (prevention against "too many open files"), thereby avoid sporadic segfault in systemd module (https://github.com/systemd/python-systemd/issues/143) --- fail2ban/server/filter.py | 17 ++++------------- fail2ban/server/filterpyinotify.py | 9 ++++----- fail2ban/server/filtersystemd.py | 10 ++++------ fail2ban/server/jail.py | 4 +++- fail2ban/server/jailthread.py | 22 ++++++++++++++++++++-- fail2ban/tests/filtertestcase.py | 2 +- 6 files changed, 36 insertions(+), 28 deletions(-) diff --git a/fail2ban/server/filter.py b/fail2ban/server/filter.py index d132ce6a..210ca084 100644 --- a/fail2ban/server/filter.py +++ b/fail2ban/server/filter.py @@ -1288,24 +1288,15 @@ class FileFilter(Filter): break db.updateLog(self.jail, log) - def onStop(self): + def afterStop(self): """Stop monitoring of log-file(s). Invoked after run method. """ - # ensure positions of pending logs are up-to-date: - if self._pendDBUpdates and self.jail.database: - self._updateDBPending() # stop files monitoring: for path in list(self.__logs.keys()): self.delLogPath(path) - - def stop(self): - """Stop filter - """ - # normally onStop will be called automatically in thread after its run ends, - # but for backwards compatibilities we'll invoke it in caller of stop method. - self.onStop() - # stop thread: - super(Filter, self).stop() + # ensure positions of pending logs are up-to-date: + if self._pendDBUpdates and self.jail.database: + self._updateDBPending() ## # FileContainer class. diff --git a/fail2ban/server/filterpyinotify.py b/fail2ban/server/filterpyinotify.py index 9fcbcb56..319314b7 100644 --- a/fail2ban/server/filterpyinotify.py +++ b/fail2ban/server/filterpyinotify.py @@ -367,19 +367,18 @@ class FilterPyinotify(FileFilter): self.commonError("unhandled", e) logSys.debug("[%s] filter exited (pyinotifier)", self.jailName) - self.__notifier = None + self.done() return True ## - # Call super.stop() and then stop the 'Notifier' + # Clean-up: then stop the 'Notifier' - def stop(self): - # stop filter thread: - super(FilterPyinotify, self).stop() + def afterStop(self): try: if self.__notifier: # stop the notifier self.__notifier.stop() + self.__notifier = None except AttributeError: # pragma: no cover if self.__notifier: raise diff --git a/fail2ban/server/filtersystemd.py b/fail2ban/server/filtersystemd.py index fc894457..cfab35a4 100644 --- a/fail2ban/server/filtersystemd.py +++ b/fail2ban/server/filtersystemd.py @@ -456,8 +456,8 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover logSys.debug("[%s] filter terminated", self.jailName) - # close journal: - self.closeJournal() + # call afterStop once (close journal, etc): + self.done() logSys.debug("[%s] filter exited (systemd)", self.jailName) return True @@ -491,12 +491,10 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover break db.updateJournal(self.jail, log, *args) - def onStop(self): - """Stop monitoring of journal. Invoked after run method. - """ + def afterStop(self): + """Cleanup""" # close journal: self.closeJournal() # ensure positions of pending logs are up-to-date: if self._pendDBUpdates and self.jail.database: self._updateDBPending() - diff --git a/fail2ban/server/jail.py b/fail2ban/server/jail.py index 0f8e3566..7e52a01a 100644 --- a/fail2ban/server/jail.py +++ b/fail2ban/server/jail.py @@ -335,7 +335,9 @@ class Jail(object): try: ## signal to stop filter / actions: if stop: - obj.stop() + if obj.isAlive(): + obj.stop() + obj.done(); # and clean-up everything ## wait for end of threads: if join: obj.join() diff --git a/fail2ban/server/jailthread.py b/fail2ban/server/jailthread.py index ccb3fe5c..99e3e629 100644 --- a/fail2ban/server/jailthread.py +++ b/fail2ban/server/jailthread.py @@ -103,7 +103,21 @@ class JailThread(Thread): def stop(self): """Sets `active` property to False, to flag run method to return. """ - self.active = False + if self.active: self.active = False + # normally onStop will be called automatically in thread after its run ends, + # but for backwards compatibilities we'll invoke it in caller of stop method. + self.onStop() + self.onStop = lambda:() + self.done() + + def done(self): + self.done = lambda:() + # if still runniung - wait a bit before initiate clean-up: + if self.is_alive(): + Utils.wait_for(lambda: not self.is_alive(), 5) + # now clean-up everything: + self.afterStop() + @abstractmethod def run(self): # pragma: no cover - absract @@ -111,11 +125,15 @@ class JailThread(Thread): """ pass + def afterStop(self): + """Cleanup resources.""" + pass + def join(self): """ Safer join, that could be called also for not started (or ended) threads (used for cleanup). """ ## if cleanup needed - create derivative and call it before join... - + self.done() ## if was really started - should call join: if self.active is not None: super(JailThread, self).join() diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index d12c93bc..c9dd4b56 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -1477,7 +1477,7 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) ") def tearDown(self): - if self.filter and self.filter.active: + if self.filter and self.filter.active or self.filter.active is None: self.filter.stop() self.filter.join() # wait for the thread to terminate super(MonitorJournalFailures, self).tearDown()