small code review of FileIPAddrSet: encapsulate check for changed logic to _isModified and slightly increase coverage for it (latency, changed, unchanged)

pull/3955/head
sebres 2025-03-03 23:59:36 +01:00
parent 7233edd0bf
commit 9145db8de3
2 changed files with 38 additions and 16 deletions

View File

@ -765,21 +765,33 @@ class FileIPAddrSet(IPAddrSet):
if m: if m:
return self.fileName == m.group(1) return self.fileName == m.group(1)
def load(self, ifNeeded=True, noError=True): def _isModified(self):
"""Check whether the file is modified (file stats changed)
Side effect: if modified, _fileStats will be updated to last known stats of file
"""
tm = MyTime.time()
# avoid to check it always (not often than maxUpdateLatency):
if tm <= self._nextCheck:
return None; # no check needed
self._nextCheck = tm + self.maxUpdateLatency
stats = os.stat(self.fileName)
stats = stats.st_mtime, stats.st_ino, stats.st_size
if self._fileStats != stats:
self._fileStats = stats
return True; # modified, needs to be reloaded
return False; # unmodified
def load(self, forceReload=False, noError=True):
"""Load set from file (on demand if needed or by forceReload)
"""
try: try:
if ifNeeded: # load only if needed and modified (or first time load on demand)
tm = MyTime.time() if self._isModified() or forceReload:
if tm > self._nextCheck: with open(self.fileName, 'r') as f:
self._nextCheck = tm + self.maxUpdateLatency ips = f.read()
stats = os.stat(self.fileName) ips = splitwords(ips, ignoreComments=True)
stats = stats.st_mtime, stats.st_ino, stats.st_size self.set(ips)
if self._fileStats == stats:
return
self._fileStats = stats
with open(self.fileName, 'r') as f:
ips = f.read()
ips = splitwords(ips, ignoreComments=True)
self.set(ips)
except Exception as e: # pragma: no cover except Exception as e: # pragma: no cover
if not noError: raise e if not noError: raise e
logSys.warning("Retrieving IPs set from %r failed: %s", self.fileName, e) logSys.warning("Retrieving IPs set from %r failed: %s", self.fileName, e)
@ -793,9 +805,9 @@ class FileIPAddrSet(IPAddrSet):
return self._shortRepr return self._shortRepr
def __contains__(self, ip): def __contains__(self, ip):
# check it is up-to-date (not often than maxUpdateLatency): # load if needed:
if self.fileName: if self.fileName:
self.load(ifNeeded=True) self.load()
# inherited contains: # inherited contains:
return IPAddrSet.__contains__(self, ip) return IPAddrSet.__contains__(self, ip)

View File

@ -2546,6 +2546,16 @@ class DNSUtilsNetworkTests(unittest.TestCase):
self.assertTrue(IPAddr('192.0.2.202') in ips) self.assertTrue(IPAddr('192.0.2.202') in ips)
self.assertTrue(IPAddr('2001:db8::ca') in ips) self.assertTrue(IPAddr('2001:db8::ca') in ips)
self.assertTrue(IPAddr('2001:db8::cb') in ips) self.assertTrue(IPAddr('2001:db8::cb') in ips)
# +1m, jump to next minute to force next check for update:
MyTime.setTime(MyTime.time() + 60)
self.assertFalse(ips._isModified()); # must be unchanged
self.assertEqual(ips._isModified(), None); # not checked by latency (same time)
f.write(b"""#END of file\n""")
f.flush()
# +1m, jump to next minute to force next check for update:
MyTime.setTime(MyTime.time() + 60)
self.assertTrue(ips._isModified()); # must be modified
self.assertEqual(ips._isModified(), None); # not checked by latency (same time)
finally: finally:
tearDownMyTime() tearDownMyTime()
_killfile(f, fname) _killfile(f, fname)