several backends optimizations (in file and journal filters):

- don't need to wait if we still had log-entries from last iteration (which got interrupted for servicing)
- rewritten update log/journal position, it is more stable and faster now (fewer DB access and surely up-to-date at end)
pull/3146/head
sebres 2021-11-03 15:35:59 +01:00
parent 96661f25ab
commit 3b02098817
3 changed files with 86 additions and 20 deletions

View File

@ -104,6 +104,10 @@ class Filter(JailThread):
## Error counter (protected, so can be used in filter implementations) ## Error counter (protected, so can be used in filter implementations)
## if it reached 100 (at once), run-cycle will go idle ## if it reached 100 (at once), run-cycle will go idle
self._errors = 0 self._errors = 0
## Next time to update log or journal position in database:
self._nextUpdateTM = 0
## Pending updates (must be executed at next update time or during stop):
self._pendDBUpdates = {}
## return raw host (host is not dns): ## return raw host (host is not dns):
self.returnRawHost = False self.returnRawHost = False
## check each regex (used for test purposes): ## check each regex (used for test purposes):
@ -1023,9 +1027,6 @@ class FileFilter(Filter):
log = self.__logs.pop(path) log = self.__logs.pop(path)
except KeyError: except KeyError:
return return
db = self.jail.database
if db is not None:
db.updateLog(self.jail, log)
logSys.info("Removed logfile: %r", path) logSys.info("Removed logfile: %r", path)
self._delLogPath(path) self._delLogPath(path)
return return
@ -1145,9 +1146,15 @@ class FileFilter(Filter):
self.processLineAndAdd(line.rstrip('\r\n')) self.processLineAndAdd(line.rstrip('\r\n'))
finally: finally:
log.close() log.close()
db = self.jail.database if self.jail.database is not None:
if db is not None: self._pendDBUpdates[log] = 1
db.updateLog(self.jail, log) if (
self.ticks % 100 == 0
or MyTime.time() >= self._nextUpdateTM
or not self.active
):
self._updateDBPending()
self._nextUpdateTM = MyTime.time() + Utils.DEFAULT_SLEEP_TIME * 5
return True return True
## ##
@ -1247,12 +1254,33 @@ class FileFilter(Filter):
ret.append(("File list", path)) ret.append(("File list", path))
return ret return ret
def stop(self): def _updateDBPending(self):
"""Stop monitoring of log-file(s) """Apply pending updates (log position) to database.
""" """
db = self.jail.database
while True:
try:
log, args = self._pendDBUpdates.popitem()
except KeyError:
break
db.updateLog(self.jail, log)
def onStop(self):
"""Stop monitoring of log-file(s). Invoked after run method.
"""
# ensure positions of pending logs are up-to-date:
if self._pendDBUpdates and self.jail.database:
self._updateDBPending()
# stop files monitoring: # stop files monitoring:
for path in self.__logs.keys(): for path in self.__logs.keys():
self.delLogPath(path) self.delLogPath(path)
def stop(self):
"""Stop filter
"""
# normally onStop will be called automatically in thread after its run ends,
# but for backwards compatibilities we'll invoke it in caller of stop method.
self.onStop()
# stop thread: # stop thread:
super(Filter, self).stop() super(Filter, self).stop()
@ -1304,6 +1332,15 @@ class FileContainer:
## shows that log is in operation mode (expecting new messages only from here): ## shows that log is in operation mode (expecting new messages only from here):
self.inOperation = tail self.inOperation = tail
def __hash__(self):
return hash(self.__filename)
def __eq__(self, other):
return (id(self) == id(other) or
self.__filename == (other.__filename if isinstance(other, FileContainer) else other)
)
def __repr__(self):
return 'file-log:'+self.__filename
def getFileName(self): def getFileName(self):
return self.__filename return self.__filename

View File

@ -61,7 +61,6 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
# Initialise systemd-journal connection # Initialise systemd-journal connection
self.__journal = journal.Reader(**jrnlargs) self.__journal = journal.Reader(**jrnlargs)
self.__matches = [] self.__matches = []
self.__nextUpdateTM = 0
self.setDatePattern(None) self.setDatePattern(None)
logSys.debug("Created FilterSystemd") logSys.debug("Created FilterSystemd")
@ -321,6 +320,7 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
#self.__journal.wait(self.sleeptime) != journal.NOP #self.__journal.wait(self.sleeptime) != journal.NOP
## ##
## wait for entries without sleep in intervals, because "sleeping" in journal.wait: ## wait for entries without sleep in intervals, because "sleeping" in journal.wait:
if not logentry:
Utils.wait_for(lambda: not self.active or \ Utils.wait_for(lambda: not self.active or \
self.__journal.wait(Utils.DEFAULT_SLEEP_INTERVAL) != journal.NOP, self.__journal.wait(Utils.DEFAULT_SLEEP_INTERVAL) != journal.NOP,
self.sleeptime, 0.00001) self.sleeptime, 0.00001)
@ -368,15 +368,17 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
if self.ticks % 10 == 0: if self.ticks % 10 == 0:
self.performSvc() self.performSvc()
# update position in log (time and iso string): # update position in log (time and iso string):
if (line and self.jail.database and ( if self.jail.database:
self.ticks % 10 == 0 if line:
or MyTime.time() >= self.__nextUpdateTM self._pendDBUpdates['systemd-journal'] = (tm, line[1])
or not self.active
)
):
self.jail.database.updateJournal(self.jail, 'systemd-journal', tm, line[1])
self.__nextUpdateTM = MyTime.time() + Utils.DEFAULT_SLEEP_TIME * 5
line = None line = None
if self._pendDBUpdates and (
self.ticks % 100 == 0
or MyTime.time() >= self._nextUpdateTM
or not self.active
):
self._updateDBPending()
self._nextUpdateTM = MyTime.time() + Utils.DEFAULT_SLEEP_TIME * 5
except Exception as e: # pragma: no cover except Exception as e: # pragma: no cover
if not self.active: # if not active - error by stop... if not self.active: # if not active - error by stop...
break break
@ -403,3 +405,22 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
ret.append(("Journal matches", ret.append(("Journal matches",
[" + ".join(" ".join(match) for match in self.__matches)])) [" + ".join(" ".join(match) for match in self.__matches)]))
return ret return ret
def _updateDBPending(self):
"""Apply pending updates (jornal position) to database.
"""
db = self.jail.database
while True:
try:
log, args = self._pendDBUpdates.popitem()
except KeyError:
break
db.updateJournal(self.jail, log, *args)
def onStop(self):
"""Stop monitoring of journal. Invoked after run method.
"""
# ensure positions of pending logs are up-to-date:
if self._pendDBUpdates and self.jail.database:
self._updateDBPending()

View File

@ -67,6 +67,8 @@ class JailThread(Thread):
def run_with_except_hook(*args, **kwargs): def run_with_except_hook(*args, **kwargs):
try: try:
run(*args, **kwargs) run(*args, **kwargs)
# call on stop callback to do some finalizations:
self.onStop()
except Exception as e: except Exception as e:
# avoid very sporadic error "'NoneType' object has no attribute 'exc_info'" (https://bugs.python.org/issue7336) # avoid very sporadic error "'NoneType' object has no attribute 'exc_info'" (https://bugs.python.org/issue7336)
# only extremely fast systems are affected ATM (2.7 / 3.x), if thread ends nothing is available here. # only extremely fast systems are affected ATM (2.7 / 3.x), if thread ends nothing is available here.
@ -97,6 +99,12 @@ class JailThread(Thread):
self.active = True self.active = True
super(JailThread, self).start() super(JailThread, self).start()
@abstractmethod
def onStop(self): # pragma: no cover - absract
"""Abstract - Called when thread ends (after run).
"""
pass
def stop(self): def stop(self):
"""Sets `active` property to False, to flag run method to return. """Sets `active` property to False, to flag run method to return.
""" """