mirror of https://github.com/fail2ban/fail2ban
Merge branch '0.10' into 0.11
commit
6d19d2e800
|
@ -312,20 +312,37 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
|
|||
except OSError:
|
||||
pass # Reading failure, so safe to ignore
|
||||
|
||||
wcode = journal.NOP
|
||||
line = None
|
||||
while self.active:
|
||||
# wait for records (or for timeout in sleeptime seconds):
|
||||
try:
|
||||
## todo: find better method as wait_for to break (e.g. notify) journal.wait(self.sleeptime),
|
||||
## don't use `journal.close()` for it, because in some python/systemd implementation it may
|
||||
## cause abnormal program termination
|
||||
#self.__journal.wait(self.sleeptime) != journal.NOP
|
||||
##
|
||||
## wait for entries without sleep in intervals, because "sleeping" in journal.wait:
|
||||
if not logentry:
|
||||
Utils.wait_for(lambda: not self.active or \
|
||||
self.__journal.wait(Utils.DEFAULT_SLEEP_INTERVAL) != journal.NOP,
|
||||
## wait for entries using journal.wait:
|
||||
if wcode == journal.NOP and self.inOperation:
|
||||
## todo: find better method as wait_for to break (e.g. notify) journal.wait(self.sleeptime),
|
||||
## don't use `journal.close()` for it, because in some python/systemd implementation it may
|
||||
## cause abnormal program termination (e. g. segfault)
|
||||
##
|
||||
## wait for entries without sleep in intervals, because "sleeping" in journal.wait,
|
||||
## journal.NOP is 0, so we can wait for non zero (APPEND or INVALIDATE):
|
||||
wcode = Utils.wait_for(lambda: not self.active and journal.APPEND or \
|
||||
self.__journal.wait(Utils.DEFAULT_SLEEP_INTERVAL),
|
||||
self.sleeptime, 0.00001)
|
||||
## if invalidate (due to rotation, vacuuming or journal files added/removed etc):
|
||||
if self.active and wcode == journal.INVALIDATE:
|
||||
if self.ticks:
|
||||
logSys.log(logging.DEBUG, "[%s] Invalidate signaled, take a little break (rotation ends)", self.jailName)
|
||||
time.sleep(self.sleeptime * 0.25)
|
||||
Utils.wait_for(lambda: not self.active or \
|
||||
self.__journal.wait(Utils.DEFAULT_SLEEP_INTERVAL) != journal.INVALIDATE,
|
||||
self.sleeptime * 3, 0.00001)
|
||||
if self.ticks:
|
||||
# move back and forth to ensure do not end up in dead space by rotation or vacuuming,
|
||||
# if position beyond end of journal (gh-3396)
|
||||
try:
|
||||
if self.__journal.get_previous(): self.__journal.get_next()
|
||||
except OSError:
|
||||
pass
|
||||
if self.idle:
|
||||
# because journal.wait will returns immediatelly if we have records in journal,
|
||||
# just wait a little bit here for not idle, to prevent hi-load:
|
||||
|
@ -360,11 +377,13 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
|
|||
self.processLineAndAdd(line, tm)
|
||||
self.__modified += 1
|
||||
if self.__modified >= 100: # todo: should be configurable
|
||||
wcode = journal.APPEND; # don't need wait - there are still unprocessed entries
|
||||
break
|
||||
else:
|
||||
# "in operation" mode since we don't have messages anymore (reached end of journal):
|
||||
if not self.inOperation:
|
||||
self.inOperationMode()
|
||||
wcode = journal.NOP; # enter wait - no more entries to process
|
||||
break
|
||||
self.__modified = 0
|
||||
if self.ticks % 10 == 0:
|
||||
|
@ -384,6 +403,7 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
|
|||
except Exception as e: # pragma: no cover
|
||||
if not self.active: # if not active - error by stop...
|
||||
break
|
||||
wcode = journal.NOP
|
||||
logSys.error("Caught unhandled exception in main cycle: %r", e,
|
||||
exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
|
||||
# incr common error counter:
|
||||
|
@ -392,15 +412,20 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
|
|||
logSys.debug("[%s] filter terminated", self.jailName)
|
||||
|
||||
# close journal:
|
||||
self.closeJournal()
|
||||
|
||||
logSys.debug("[%s] filter exited (systemd)", self.jailName)
|
||||
return True
|
||||
|
||||
def closeJournal(self):
|
||||
try:
|
||||
if self.__journal:
|
||||
self.__journal.close()
|
||||
jnl, self.__journal = self.__journal, None
|
||||
if jnl:
|
||||
jnl.close()
|
||||
except Exception as e: # pragma: no cover
|
||||
logSys.error("Close journal failed: %r", e,
|
||||
exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
|
||||
|
||||
logSys.debug("[%s] filter exited (systemd)", self.jailName)
|
||||
return True
|
||||
|
||||
def status(self, flavor="basic"):
|
||||
ret = super(FilterSystemd, self).status(flavor=flavor)
|
||||
|
@ -422,6 +447,8 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
|
|||
def onStop(self):
|
||||
"""Stop monitoring of journal. Invoked after run method.
|
||||
"""
|
||||
# close journal:
|
||||
self.closeJournal()
|
||||
# ensure positions of pending logs are up-to-date:
|
||||
if self._pendDBUpdates and self.jail.database:
|
||||
self._updateDBPending()
|
||||
|
|
Loading…
Reference in New Issue