improve threaded clean-up of filters, new functions `afterStop` (to force clean-up after stop) and `done`, invoking `afterStop` once; ensure journal-reader is always closed (prevention against "too many open files"), thereby avoid sporadic segfault in systemd module (https://github.com/systemd/python-systemd/issues/143)

pull/3927/merge
sebres 2025-03-30 05:52:42 +02:00
parent 9f0b6382bf
commit 191d1e9533
6 changed files with 36 additions and 28 deletions

View File

@ -1288,24 +1288,15 @@ class FileFilter(Filter):
break break
db.updateLog(self.jail, log) db.updateLog(self.jail, log)
def onStop(self): def afterStop(self):
"""Stop monitoring of log-file(s). Invoked after run method. """Stop monitoring of log-file(s). Invoked after run method.
""" """
# ensure positions of pending logs are up-to-date:
if self._pendDBUpdates and self.jail.database:
self._updateDBPending()
# stop files monitoring: # stop files monitoring:
for path in list(self.__logs.keys()): for path in list(self.__logs.keys()):
self.delLogPath(path) self.delLogPath(path)
# ensure positions of pending logs are up-to-date:
def stop(self): if self._pendDBUpdates and self.jail.database:
"""Stop filter self._updateDBPending()
"""
# normally onStop will be called automatically in thread after its run ends,
# but for backwards compatibilities we'll invoke it in caller of stop method.
self.onStop()
# stop thread:
super(Filter, self).stop()
## ##
# FileContainer class. # FileContainer class.

View File

@ -367,19 +367,18 @@ class FilterPyinotify(FileFilter):
self.commonError("unhandled", e) self.commonError("unhandled", e)
logSys.debug("[%s] filter exited (pyinotifier)", self.jailName) logSys.debug("[%s] filter exited (pyinotifier)", self.jailName)
self.__notifier = None self.done()
return True return True
## ##
# Call super.stop() and then stop the 'Notifier' # Clean-up: then stop the 'Notifier'
def stop(self): def afterStop(self):
# stop filter thread:
super(FilterPyinotify, self).stop()
try: try:
if self.__notifier: # stop the notifier if self.__notifier: # stop the notifier
self.__notifier.stop() self.__notifier.stop()
self.__notifier = None
except AttributeError: # pragma: no cover except AttributeError: # pragma: no cover
if self.__notifier: raise if self.__notifier: raise

View File

@ -456,8 +456,8 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
logSys.debug("[%s] filter terminated", self.jailName) logSys.debug("[%s] filter terminated", self.jailName)
# close journal: # call afterStop once (close journal, etc):
self.closeJournal() self.done()
logSys.debug("[%s] filter exited (systemd)", self.jailName) logSys.debug("[%s] filter exited (systemd)", self.jailName)
return True return True
@ -491,12 +491,10 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
break break
db.updateJournal(self.jail, log, *args) db.updateJournal(self.jail, log, *args)
def onStop(self): def afterStop(self):
"""Stop monitoring of journal. Invoked after run method. """Cleanup"""
"""
# close journal: # close journal:
self.closeJournal() self.closeJournal()
# ensure positions of pending logs are up-to-date: # ensure positions of pending logs are up-to-date:
if self._pendDBUpdates and self.jail.database: if self._pendDBUpdates and self.jail.database:
self._updateDBPending() self._updateDBPending()

View File

@ -335,7 +335,9 @@ class Jail(object):
try: try:
## signal to stop filter / actions: ## signal to stop filter / actions:
if stop: if stop:
obj.stop() if obj.isAlive():
obj.stop()
obj.done(); # and clean-up everything
## wait for end of threads: ## wait for end of threads:
if join: if join:
obj.join() obj.join()

View File

@ -103,7 +103,21 @@ class JailThread(Thread):
def stop(self): def stop(self):
"""Sets `active` property to False, to flag run method to return. """Sets `active` property to False, to flag run method to return.
""" """
self.active = False if self.active: self.active = False
# normally onStop will be called automatically in thread after its run ends,
# but for backwards compatibilities we'll invoke it in caller of stop method.
self.onStop()
self.onStop = lambda:()
self.done()
def done(self):
self.done = lambda:()
# if still runniung - wait a bit before initiate clean-up:
if self.is_alive():
Utils.wait_for(lambda: not self.is_alive(), 5)
# now clean-up everything:
self.afterStop()
@abstractmethod @abstractmethod
def run(self): # pragma: no cover - absract def run(self): # pragma: no cover - absract
@ -111,11 +125,15 @@ class JailThread(Thread):
""" """
pass pass
def afterStop(self):
"""Cleanup resources."""
pass
def join(self): def join(self):
""" Safer join, that could be called also for not started (or ended) threads (used for cleanup). """ Safer join, that could be called also for not started (or ended) threads (used for cleanup).
""" """
## if cleanup needed - create derivative and call it before join... ## if cleanup needed - create derivative and call it before join...
self.done()
## if was really started - should call join: ## if was really started - should call join:
if self.active is not None: if self.active is not None:
super(JailThread, self).join() super(JailThread, self).join()

View File

@ -1477,7 +1477,7 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>") self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
def tearDown(self): def tearDown(self):
if self.filter and self.filter.active: if self.filter and self.filter.active or self.filter.active is None:
self.filter.stop() self.filter.stop()
self.filter.join() # wait for the thread to terminate self.filter.join() # wait for the thread to terminate
super(MonitorJournalFailures, self).tearDown() super(MonitorJournalFailures, self).tearDown()