backend `systemd` extended with new parameter `rotated` (default false, as prevention against "too many open files"), that allows to monitor only actual journals and ignore a lot of rotated files by default; so can drastically reduce amount of used file descriptors (to 1 or 2 per jail);

closes #3391
pull/3927/merge
sebres 2025-03-30 19:03:32 +02:00
parent 7a4985178f
commit 4eef68b3d3
2 changed files with 103 additions and 10 deletions

View File

@ -25,18 +25,61 @@ __license__ = "GPL"
import os
import time
from glob import glob
from systemd import journal
from .failmanager import FailManagerEmpty
from .filter import JournalFilter, Filter
from .mytime import MyTime
from .utils import Utils
from ..helpers import getLogger, logging, splitwords, uni_decode
from ..helpers import getLogger, logging, splitwords, uni_decode, _as_bool
# Gets the instance of the logger.
logSys = getLogger(__name__)
_systemdPathCache = Utils.Cache()
def _getSystemdPath(path):
"""Get systemd path using systemd-path command (cached)"""
p = _systemdPathCache.get(path)
if p: return p
p = Utils.executeCmd('systemd-path %s' % path, timeout=10, shell=True, output=True)
if p and p[0]:
p = str(p[1].decode('utf-8')).split('\n')[0]
_systemdPathCache.set(path, p)
return p
p = '/var/log' if path == 'system-state-logs' else ('/run/log' if path == 'system-runtime-logs' else None)
_systemdPathCache.set(path, p)
return p
def _globJournalFiles(flags=None, path=None):
"""Get journal files without rotated files."""
filesSet = set()
_join = os.path.join
def _addJF(filesSet, p, flags):
"""add journal files to set corresponding path and flags (without rotated *@*.journal)"""
# system journal:
if (flags is None) or (flags & journal.SYSTEM_ONLY):
filesSet |= set(glob(_join(p,'system.journal'))) - set(glob(_join(p,'system*@*.journal')))
# current user-journal:
if (flags is not None) and (flags & journal.CURRENT_USER):
uid = os.getuid()
filesSet |= set(glob(_join(p,('user-%s.journal' % uid)))) - set(glob(_join(p,('user-%s@*.journal' % uid))))
# all local journals:
if (flags is None) or not (flags & (journal.SYSTEM_ONLY|journal.CURRENT_USER)):
filesSet |= set(glob(_join(p,'*.journal'))) - set(glob(_join(p,'*@*.journal')))
if path:
# journals relative given path only:
_addJF(filesSet, path, flags)
else:
# persistent journals corresponding flags:
if (flags is None) or not (flags & journal.RUNTIME_ONLY):
_addJF(filesSet, _join(_getSystemdPath('system-state-logs'), 'journal/*'), flags)
# runtime journals corresponding flags:
_addJF(filesSet, _join(_getSystemdPath('system-runtime-logs'), 'journal/*'), flags)
return filesSet
##
# Journal reader class.
#
@ -75,24 +118,40 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
except KeyError:
pass
else:
import glob
p = args['files']
if not isinstance(p, (list, set, tuple)):
p = splitwords(p)
files = []
for p in p:
files.extend(glob.glob(p))
files.extend(glob(p))
args['files'] = list(set(files))
# Default flags is SYSTEM_ONLY(4). This would lead to ignore user session files,
# so can prevent "Too many open files" errors on a lot of user sessions (see gh-2392):
rotated = _as_bool(kwargs.pop('rotated', 0))
# Default flags is SYSTEM_ONLY(4) or LOCAL_ONLY(1), depending on rotated parameter.
# This could lead to ignore user session files, so together with ignoring rotated
# files would prevent "Too many open files" errors on a lot of user sessions (see gh-2392):
try:
args['flags'] = int(kwargs.pop('journalflags'))
except KeyError:
# be sure all journal types will be opened if files/path specified (don't set flags):
if ('files' not in args or not len(args['files'])) and ('path' not in args or not args['path']):
args['flags'] = int(os.getenv("F2B_SYSTEMD_DEFAULT_FLAGS", 4))
if (not args.get('files') and not args.get('path')):
args['flags'] = os.getenv("F2B_SYSTEMD_DEFAULT_FLAGS", None)
if args['flags'] is not None:
args['flags'] = int(args['flags'])
elif rotated:
args['flags'] = journal.SYSTEM_ONLY
# To avoid monitoring rotated logs, as prevention against "Too many open files",
# set the files to system.journal and user-*.journal (without rotated *@*.journal):
if not rotated and not args.get('files') and not args.get('namespace'):
args['files'] = _globJournalFiles(
args.get('flags', journal.LOCAL_ONLY), args.get('path'))
if args['files']:
args['files'] = list(args['files'])
args['path'] = None; # cannot be cannot be specified simultaneously with files
else:
args['files'] = None
try:
args['namespace'] = kwargs.pop('namespace')
except KeyError:
@ -128,7 +187,8 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
# to reopen journal we'd simply invoke inherited init again:
self.__journal.close()
ja = self.__jrnlargs
super(journal.Reader, self.__journal).__init__(ja.get('flags', 0), ja.get('path'), ja.get('files'), ja.get('namespace'))
super(journal.Reader, self.__journal).__init__(
ja.get('flags', 0), ja.get('path'), ja.get('files'), ja.get('namespace'))
except:
# cannot reopen in that way, so simply recreate reader:
self.closeJournal()

View File

@ -1510,6 +1510,32 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
return MonitorJournalFailures._runtimeJournal
raise unittest.SkipTest('systemd journal seems to be not available (e. g. no rights to read)')
@with_tmpdir
def testGlobJournal(self, tmp):
try:
from ..server.filtersystemd import journal, _globJournalFiles
except ImportError: # pragma: no cover
raise unittest.SkipTest("systemd python interface not available")
jrnlfile = self._getRuntimeJournal()
jrnlpath = os.path.dirname(jrnlfile)
self.assertIn(jrnlfile, _globJournalFiles(journal.SYSTEM_ONLY))
self.assertIn(jrnlfile, _globJournalFiles(journal.SYSTEM_ONLY, jrnlpath))
self.assertIn(jrnlfile, _globJournalFiles(journal.LOCAL_ONLY))
self.assertIn(jrnlfile, _globJournalFiles(journal.LOCAL_ONLY, jrnlpath))
# no files yet in temp-path:
self.assertFalse(_globJournalFiles(None, tmp))
# test against temp-path, shall ignore all rotated files:
tsysjrnl = os.path.join(tmp, 'system.journal')
tusrjrnl = os.path.join(tmp, 'user-%s.journal' % os.getuid())
def touch(fn): os.close(os.open(fn, os.O_CREAT|os.O_APPEND))
touch(tsysjrnl);
touch(tusrjrnl);
touch(os.path.join(tmp, 'system@test-rotated.journal'));
touch(os.path.join(tmp, 'user-%s@test-rotated.journal' % os.getuid()));
self.assertSortedEqual(_globJournalFiles(None, tmp), {tsysjrnl, tusrjrnl})
self.assertSortedEqual(_globJournalFiles(journal.SYSTEM_ONLY, tmp), {tsysjrnl})
self.assertSortedEqual(_globJournalFiles(journal.CURRENT_USER, tmp), {tusrjrnl})
def testJournalFilesArg(self):
# retrieve current system journal path
jrnlfile = self._getRuntimeJournal()
@ -1533,9 +1559,16 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
self.assertTrue(self.isEmpty(1))
self.assertEqual(len(self.jail), 0)
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
def testJournalPath_RotatedArg(self):
# retrieve current system journal path
jrnlpath = self._getRuntimeJournal()
jrnlpath = os.path.dirname(jrnlpath)
self._initFilter(journalpath=jrnlpath, rotated=1)
def testJournalFlagsArg(self):
self._initFilter(journalflags=0) # e. g. 2 - journal.RUNTIME_ONLY
self._initFilter(journalflags=0)
def testJournalFlags_RotatedArg(self):
self._initFilter(journalflags=0, rotated=1)
def assert_correct_ban(self, test_ip, test_attempts):
self.assertTrue(self.waitFailTotal(test_attempts, 10)) # give Filter a chance to react