From 9145db8de3872011e090fc6d22c33d705af95850 Mon Sep 17 00:00:00 2001 From: sebres Date: Mon, 3 Mar 2025 23:59:36 +0100 Subject: [PATCH] small code review of FileIPAddrSet: encapsulate check for changed logic to _isModified and slightly increase coverage for it (latency, changed, unchanged) --- fail2ban/server/ipdns.py | 44 ++++++++++++++++++++------------ fail2ban/tests/filtertestcase.py | 10 ++++++++ 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/fail2ban/server/ipdns.py b/fail2ban/server/ipdns.py index a485a1b2..1264479e 100644 --- a/fail2ban/server/ipdns.py +++ b/fail2ban/server/ipdns.py @@ -765,21 +765,33 @@ class FileIPAddrSet(IPAddrSet): if m: return self.fileName == m.group(1) - def load(self, ifNeeded=True, noError=True): + def _isModified(self): + """Check whether the file is modified (file stats changed) + + Side effect: if modified, _fileStats will be updated to last known stats of file + """ + tm = MyTime.time() + # avoid to check it always (not often than maxUpdateLatency): + if tm <= self._nextCheck: + return None; # no check needed + self._nextCheck = tm + self.maxUpdateLatency + stats = os.stat(self.fileName) + stats = stats.st_mtime, stats.st_ino, stats.st_size + if self._fileStats != stats: + self._fileStats = stats + return True; # modified, needs to be reloaded + return False; # unmodified + + def load(self, forceReload=False, noError=True): + """Load set from file (on demand if needed or by forceReload) + """ try: - if ifNeeded: - tm = MyTime.time() - if tm > self._nextCheck: - self._nextCheck = tm + self.maxUpdateLatency - stats = os.stat(self.fileName) - stats = stats.st_mtime, stats.st_ino, stats.st_size - if self._fileStats == stats: - return - self._fileStats = stats - with open(self.fileName, 'r') as f: - ips = f.read() - ips = splitwords(ips, ignoreComments=True) - self.set(ips) + # load only if needed and modified (or first time load on demand) + if self._isModified() or forceReload: + with open(self.fileName, 'r') as f: + ips = f.read() + ips = splitwords(ips, ignoreComments=True) + self.set(ips) except Exception as e: # pragma: no cover if not noError: raise e logSys.warning("Retrieving IPs set from %r failed: %s", self.fileName, e) @@ -793,9 +805,9 @@ class FileIPAddrSet(IPAddrSet): return self._shortRepr def __contains__(self, ip): - # check it is up-to-date (not often than maxUpdateLatency): + # load if needed: if self.fileName: - self.load(ifNeeded=True) + self.load() # inherited contains: return IPAddrSet.__contains__(self, ip) diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index ece75201..d12c93bc 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -2546,6 +2546,16 @@ class DNSUtilsNetworkTests(unittest.TestCase): self.assertTrue(IPAddr('192.0.2.202') in ips) self.assertTrue(IPAddr('2001:db8::ca') in ips) self.assertTrue(IPAddr('2001:db8::cb') in ips) + # +1m, jump to next minute to force next check for update: + MyTime.setTime(MyTime.time() + 60) + self.assertFalse(ips._isModified()); # must be unchanged + self.assertEqual(ips._isModified(), None); # not checked by latency (same time) + f.write(b"""#END of file\n""") + f.flush() + # +1m, jump to next minute to force next check for update: + MyTime.setTime(MyTime.time() + 60) + self.assertTrue(ips._isModified()); # must be modified + self.assertEqual(ips._isModified(), None); # not checked by latency (same time) finally: tearDownMyTime() _killfile(f, fname)