ipdns.py: implemented FileIPAddrSet supporting file with IP-set, what may contain IP, subnet, or dns, with lazy load and dynamically reloaded by changes (with small latency to avoid expensive stats check on every compare)

pull/3955/head
sebres 2025-03-03 18:40:15 +01:00
parent 1c61836169
commit bdae15b522
1 changed files with 108 additions and 4 deletions

View File

@ -23,10 +23,11 @@ __license__ = "GPL"
import socket
import struct
import os
import re
from .utils import Utils
from ..helpers import getLogger
from ..helpers import getLogger, MyTime, splitwords
# Gets the instance of the logger.
logSys = getLogger(__name__)
@ -79,6 +80,8 @@ class DNSUtils:
# todo: make configurable the expired time and max count of cache entries:
CACHE_nameToIp = Utils.Cache(maxCount=1000, maxTime=5*60)
CACHE_ipToName = Utils.Cache(maxCount=1000, maxTime=5*60)
# static cache used to hold sets read from files:
CACHE_fileToIp = Utils.Cache(maxCount=100, maxTime=5*60)
@staticmethod
def dnsToIp(dns):
@ -229,6 +232,20 @@ class DNSUtils:
DNSUtils.CACHE_nameToIp.set(DNSUtils._getSelfIPs_key, ips)
return ips
@staticmethod
def getIPsFromFile(fileName, noError=True):
"""Get set of IP addresses or subnets from file"""
# to find cached IPs:
ips = DNSUtils.CACHE_fileToIp.get(fileName)
if ips is not None:
return ips
# try to obtain set from file:
ips = FileIPAddrSet(fileName)
#ips.load() - load on demand
# cache and return :
DNSUtils.CACHE_fileToIp.set(fileName, ips)
return ips
_IPv6IsAllowed = None
@staticmethod
@ -457,6 +474,10 @@ class IPAddr(object):
def familyStr(self):
return IPAddr.FAM2STR.get(self._family)
@property
def instanceType(self):
return "ip" if self.isValid else "dns"
@property
def plen(self):
return self._plen
@ -598,6 +619,9 @@ class IPAddr(object):
def isInNet(self, net):
"""Return either the IP object is in the provided network
"""
# if addr-set:
if isinstance(net, IPAddrSet):
return self in net
# if it isn't a valid IP address, try DNS resolution
if not net.isValid and net.raw != "":
# Check if IP in DNS
@ -675,15 +699,32 @@ IPAddr.IP6_4COMPAT = IPAddr("::ffff:0:0", 96)
class IPAddrSet(set):
hasSubNet = False
hasSubNet = 0
def __init__(self, ips=[]):
ips, subnet = IPAddrSet._list2set(ips)
set.__init__(self, ips)
self.hasSubNet = subnet
@staticmethod
def _list2set(ips):
ips2 = set()
subnet = 0
for ip in ips:
if not isinstance(ip, IPAddr): ip = IPAddr(ip)
ips2.add(ip)
self.hasSubNet |= not ip.isSingle
set.__init__(self, ips2)
subnet += not ip.isSingle
return ips2, subnet
@property
def instanceType(self):
return "ip-set"
def set(self, ips):
ips, subnet = IPAddrSet._list2set(ips)
self.clear()
self.update(ips)
self.hasSubNet = subnet
def add(self, ip):
if not isinstance(ip, IPAddr): ip = IPAddr(ip)
@ -696,6 +737,69 @@ class IPAddrSet(set):
return set.__contains__(self, ip) or (self.hasSubNet and any(n.contains(ip) for n in self))
class FileIPAddrSet(IPAddrSet):
# RE matching file://...
RE_FILE_IGN_IP = re.compile(r'^file:/{0,2}(.*)$')
fileName = ''
_shortRepr = None
maxUpdateLatency = 1 # latency in seconds to update by changes
_nextCheck = 0
_fileStats = ()
def __init__(self, fileName=''):
self.fileName = fileName
# self.load() - lazy load on demand by first check (in, __contains__ etc)
@property
def instanceType(self):
return repr(self)
def __eq__(self, other):
if id(self) == id(other): return 1
# to allow remove file-set from list (delIgnoreIP) by its name:
if isinstance(other, FileIPAddrSet):
return self.fileName == other.fileName
m = FileIPAddrSet.RE_FILE_IGN_IP.match(other)
if m:
return self.fileName == m.group(1)
def load(self, ifNeeded=True, noError=True):
try:
if ifNeeded:
tm = MyTime.time()
if tm > self._nextCheck:
self._nextCheck = tm + self.maxUpdateLatency
stats = os.stat(self.fileName)
stats = stats.st_mtime, stats.st_ino, stats.st_size
if self._fileStats == stats:
return
self._fileStats = stats
with open(self.fileName, 'r') as f:
ips = f.read()
ips = splitwords(ips)
self.set(ips)
except Exception as e: # pragma: no cover
if not noError: raise e
logSys.warning("Retrieving IPs set from %r failed: %s", self.fileName, e)
def __repr__(self):
if not self._shortRepr:
shortfn = os.path.basename(self.fileName)
if shortfn != self.fileName:
shortfn = '.../' + shortfn
self._shortRepr = 'file:' + shortfn + ')'
return self._shortRepr
def __contains__(self, ip):
# check it is uptodate (not often than maxUpdateLatency):
if self.fileName:
self.load(ifNeeded=True)
# inherited contains:
return IPAddrSet.__contains__(self, ip)
def _NetworkInterfacesAddrs(withMask=False):
# Closure implementing lazy load modules and libc and define _NetworkInterfacesAddrs on demand: