systemd backend: better avoidance of landing in dead space by seeks over journals;

increase verbosity and stability of few systemd tests (fixes sporadic timing issues);
seekToTime doesn't need to convert float to datetime, because seek_realtime accepts it as unix time (we need to convert integers only, since it means microseconds and deprecated);
pull/2296/merge
sebres 2022-02-09 14:47:40 +01:00
parent 498e473a10
commit cdb6a46945
2 changed files with 30 additions and 21 deletions

View File

@ -22,7 +22,6 @@ __author__ = "Steven Hiscocks"
__copyright__ = "Copyright (c) 2013 Steven Hiscocks" __copyright__ = "Copyright (c) 2013 Steven Hiscocks"
__license__ = "GPL" __license__ = "GPL"
import datetime
import os import os
import time import time
from distutils.version import LooseVersion from distutils.version import LooseVersion
@ -254,8 +253,8 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
return ((logline[:0], date[0], logline.replace('\n', '\\n')), date[1]) return ((logline[:0], date[0], logline.replace('\n', '\\n')), date[1])
def seekToTime(self, date): def seekToTime(self, date):
if not isinstance(date, datetime.datetime): if isinstance(date, (int, long)):
date = datetime.datetime.fromtimestamp(date) date = float(date)
self.__journal.seek_realtime(date) self.__journal.seek_realtime(date)
def inOperationMode(self): def inOperationMode(self):
@ -281,7 +280,8 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
try: try:
self.__journal.seek_tail() self.__journal.seek_tail()
logentry = self.__journal.get_previous() logentry = self.__journal.get_previous()
self.__journal.get_next() if logentry:
self.__journal.get_next()
except OSError: except OSError:
logentry = None # Reading failure, so safe to ignore logentry = None # Reading failure, so safe to ignore
if logentry: if logentry:
@ -296,12 +296,6 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
self.inOperation = False self.inOperation = False
# Save current time in order to check time to switch "in operation" mode # Save current time in order to check time to switch "in operation" mode
startTime = (1, MyTime.time(), logentry.get('__CURSOR')) startTime = (1, MyTime.time(), logentry.get('__CURSOR'))
# Move back one entry to ensure do not end up in dead space
# if start time beyond end of journal
try:
self.__journal.get_previous()
except OSError:
pass # Reading failure, so safe to ignore
else: else:
# empty journal or no entries for current filter: # empty journal or no entries for current filter:
self.inOperationMode() self.inOperationMode()
@ -311,6 +305,13 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
# for possible future switches of in-operation mode: # for possible future switches of in-operation mode:
startTime = (0, startTime) startTime = (0, startTime)
# Move back one entry to ensure do not end up in dead space
# if start time beyond end of journal
try:
self.__journal.get_previous()
except OSError:
pass # Reading failure, so safe to ignore
line = None line = None
while self.active: while self.active:
# wait for records (or for timeout in sleeptime seconds): # wait for records (or for timeout in sleeptime seconds):

View File

@ -1514,7 +1514,7 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
# stop: # stop:
self.filter.stop() self.filter.stop()
self.filter.join() self.filter.join()
MyTime.setTime(time.time() + 2) MyTime.setTime(time.time() + 10)
# update log manually (should cause a seek to end of log without wait for next second): # update log manually (should cause a seek to end of log without wait for next second):
self.jail.database.updateJournal(self.jail, 'systemd-journal', MyTime.time(), 'TEST') self.jail.database.updateJournal(self.jail, 'systemd-journal', MyTime.time(), 'TEST')
# check seek to last (simulated) position succeeds (without bans of previous copied tickets): # check seek to last (simulated) position succeeds (without bans of previous copied tickets):
@ -1522,7 +1522,7 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
self._initFilter() self._initFilter()
self.filter.setMaxRetry(1) self.filter.setMaxRetry(1)
self.filter.start() self.filter.start()
self.waitForTicks(1) self.waitForTicks(2)
# check new IP but no old IPs found: # check new IP but no old IPs found:
_gen_falure("192.0.2.5") _gen_falure("192.0.2.5")
self.assertFalse(self.jail.getFailTicket()) self.assertFalse(self.jail.getFailTicket())
@ -1535,8 +1535,8 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
self._initFilter() self._initFilter()
self.filter.setMaxRetry(1) self.filter.setMaxRetry(1)
self.filter.start() self.filter.start()
self.waitForTicks(1) self.waitForTicks(2)
MyTime.setTime(time.time() + 3) MyTime.setTime(time.time() + 20)
# check new IP but no old IPs found: # check new IP but no old IPs found:
_gen_falure("192.0.2.6") _gen_falure("192.0.2.6")
self.assertFalse(self.jail.getFailTicket()) self.assertFalse(self.jail.getFailTicket())
@ -1549,15 +1549,23 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
self.filter.setMaxRetry(1) self.filter.setMaxRetry(1)
states = [] states = []
def _state(*args): def _state(*args):
self.assertNotIn("** in operation", states) try:
self.assertFalse(self.filter.inOperation) self.assertNotIn("** in operation", states)
states.append("** process line: %r" % (args,)) self.assertFalse(self.filter.inOperation)
states.append("** process line: %r" % (args,))
except Exception as e:
states.append("** failed: %r" % (e,))
raise
self.filter.processLineAndAdd = _state self.filter.processLineAndAdd = _state
def _inoper(): def _inoper():
self.assertNotIn("** in operation", states) try:
self.assertEqual(len(states), 11) self.assertNotIn("** in operation", states)
states.append("** in operation") self.assertEqual(len(states), 11)
self.filter.__class__.inOperationMode(self.filter) states.append("** in operation")
self.filter.__class__.inOperationMode(self.filter)
except Exception as e:
states.append("** failed: %r" % (e,))
raise
self.filter.inOperationMode = _inoper self.filter.inOperationMode = _inoper
self.filter.start() self.filter.start()
self.waitForTicks(12) self.waitForTicks(12)