diff --git a/fail2ban/server/filtersystemd.py b/fail2ban/server/filtersystemd.py index abd66e1f..29f90be5 100644 --- a/fail2ban/server/filtersystemd.py +++ b/fail2ban/server/filtersystemd.py @@ -52,12 +52,13 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover # @param jail the jail object def __init__(self, jail, **kwargs): - jrnlargs = FilterSystemd._getJournalArgs(kwargs) + self.__jrnlargs = FilterSystemd._getJournalArgs(kwargs) JournalFilter.__init__(self, jail, **kwargs) self.__modified = 0 # Initialise systemd-journal connection - self.__journal = journal.Reader(**jrnlargs) + self.__journal = journal.Reader(**self.__jrnlargs) self.__matches = [] + self.__bypassInvalidateMsg = 0 self.setDatePattern(None) logSys.debug("Created FilterSystemd") @@ -99,6 +100,43 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover return args + @property + def _journalAlive(self): + """Checks journal is online. + """ + try: + # open? + if self.__journal.closed: # pragma: no cover + return False + # has cursor? if it is broken (e. g. no descriptor) - it'd raise this: + # OSError: [Errno 99] Cannot assign requested address + if self.__journal._get_cursor(): + return True + except OSError: # pragma: no cover + pass + return False + + def _reopenJournal(self): # pragma: no cover + """Reopen journal (if it becomes offline after rotation) + """ + if self.__journal.closed: + # recreate reader: + self.__journal = journal.Reader(**self.__jrnlargs) + else: + try: + # workaround for gh-3929 (no journal descriptor after rotation), + # to reopen journal we'd simply invoke inherited init again: + ja = self.__jrnlargs + super(journal.Reader, self.__journal).__init__(ja.get('flags', 0), ja.get('path'), ja.get('files'), ja.get('namespace')) + except: + # cannot reopen in that way, so simply recreate reader: + self.closeJournal() + self.__journal = journal.Reader(**self.__jrnlargs) + # restore journalmatch specified for the jail: + self.resetJournalMatches() + # just to avoid "Invalidate signaled" happening again after reopen: + self.__bypassInvalidateMsg = MyTime.time() + 1 + ## # Add a journal match filters from list structure # @@ -257,6 +295,8 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover def inOperationMode(self): self.inOperation = True logSys.info("[%s] Jail is in operation now (process new journal entries)", self.jailName) + # just to avoid "Invalidate signaled" happening often at start: + self.__bypassInvalidateMsg = MyTime.time() + 1 ## # Main loop. @@ -328,8 +368,10 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover ## if invalidate (due to rotation, vacuuming or journal files added/removed etc): if self.active and wcode == journal.INVALIDATE: if self.ticks: - logSys.log(logging.DEBUG, "[%s] Invalidate signaled, take a little break (rotation ends)", self.jailName) + if not self.__bypassInvalidateMsg or MyTime.time() > self.__bypassInvalidateMsg: + logSys.log(logging.MSG, "[%s] Invalidate signaled, take a little break (rotation ends)", self.jailName) time.sleep(self.sleeptime * 0.25) + self.__bypassInvalidateMsg = 0 Utils.wait_for(lambda: not self.active or \ self.__journal.wait(Utils.DEFAULT_SLEEP_INTERVAL) != journal.INVALIDATE, self.sleeptime * 3, 0.00001) @@ -340,6 +382,11 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover if self.__journal.get_previous(): self.__journal.get_next() except OSError: pass + # if it is not alive - reopen: + if not self._journalAlive: + logSys.log(logging.MSG, "[%s] Journal reader seems to be offline, reopen journal", self.jailName) + self._reopenJournal() + wcode = journal.NOP if self.idle: # because journal.wait will returns immediately if we have records in journal, # just wait a little bit here for not idle, to prevent hi-load: