filtersystemd: code review, wait only if it is necessary - in operational mode and if no more entries retrieved (end of journal);

attempt to fix gh-3396 - ensure we give enough time after journal.wait returns with INVALIDATE (due to rotation, vacuuming or journal files added/removed etc) and move cursor back and forth to avoid entering dead space
pull/3235/merge
sebres 2 years ago
parent 485c50228a
commit 04c252c34b

@ -312,20 +312,37 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
except OSError:
pass # Reading failure, so safe to ignore
wcode = journal.NOP
line = None
while self.active:
# wait for records (or for timeout in sleeptime seconds):
try:
## todo: find better method as wait_for to break (e.g. notify) journal.wait(self.sleeptime),
## don't use `journal.close()` for it, because in some python/systemd implementation it may
## cause abnormal program termination
#self.__journal.wait(self.sleeptime) != journal.NOP
##
## wait for entries without sleep in intervals, because "sleeping" in journal.wait:
if not logentry:
Utils.wait_for(lambda: not self.active or \
self.__journal.wait(Utils.DEFAULT_SLEEP_INTERVAL) != journal.NOP,
## wait for entries using journal.wait:
if wcode == journal.NOP and self.inOperation:
## todo: find better method as wait_for to break (e.g. notify) journal.wait(self.sleeptime),
## don't use `journal.close()` for it, because in some python/systemd implementation it may
## cause abnormal program termination (e. g. segfault)
##
## wait for entries without sleep in intervals, because "sleeping" in journal.wait,
## journal.NOP is 0, so we can wait for non zero (APPEND or INVALIDATE):
wcode = Utils.wait_for(lambda: not self.active and journal.APPEND or \
self.__journal.wait(Utils.DEFAULT_SLEEP_INTERVAL),
self.sleeptime, 0.00001)
## if invalidate (due to rotation, vacuuming or journal files added/removed etc):
if self.active and wcode == journal.INVALIDATE:
if self.ticks:
logSys.log(logging.DEBUG, "[%s] Invalidate signaled, take a little break (rotation ends)", self.jailName)
time.sleep(self.sleeptime * 0.25)
Utils.wait_for(lambda: not self.active or \
self.__journal.wait(Utils.DEFAULT_SLEEP_INTERVAL) != journal.INVALIDATE,
self.sleeptime * 3, 0.00001)
if self.ticks:
# move back and forth to ensure do not end up in dead space by rotation or vacuuming,
# if position beyond end of journal (gh-3396)
try:
if self.__journal.get_previous(): self.__journal.get_next()
except OSError:
pass
if self.idle:
# because journal.wait will returns immediatelly if we have records in journal,
# just wait a little bit here for not idle, to prevent hi-load:
@ -360,11 +377,13 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
self.processLineAndAdd(line, tm)
self.__modified += 1
if self.__modified >= 100: # todo: should be configurable
wcode = journal.APPEND; # don't need wait - there are still unprocessed entries
break
else:
# "in operation" mode since we don't have messages anymore (reached end of journal):
if not self.inOperation:
self.inOperationMode()
wcode = journal.NOP; # enter wait - no more entries to process
break
self.__modified = 0
if self.ticks % 10 == 0:
@ -384,6 +403,7 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
except Exception as e: # pragma: no cover
if not self.active: # if not active - error by stop...
break
wcode = journal.NOP
logSys.error("Caught unhandled exception in main cycle: %r", e,
exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
# incr common error counter:
@ -392,15 +412,20 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
logSys.debug("[%s] filter terminated", self.jailName)
# close journal:
self.closeJournal()
logSys.debug("[%s] filter exited (systemd)", self.jailName)
return True
def closeJournal(self):
try:
if self.__journal:
self.__journal.close()
jnl, self.__journal = self.__journal, None
if jnl:
jnl.close()
except Exception as e: # pragma: no cover
logSys.error("Close journal failed: %r", e,
exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
logSys.debug("[%s] filter exited (systemd)", self.jailName)
return True
def status(self, flavor="basic"):
ret = super(FilterSystemd, self).status(flavor=flavor)
@ -422,6 +447,8 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
def onStop(self):
"""Stop monitoring of journal. Invoked after run method.
"""
# close journal:
self.closeJournal()
# ensure positions of pending logs are up-to-date:
if self._pendDBUpdates and self.jail.database:
self._updateDBPending()

Loading…
Cancel
Save