amend with few improvements, IPv6IsAllowed prefers IPs from network interfaces (if available for platform) and uses DNS (socket.getaddrinfo) as a fallback only

pull/3460/head
sebres 2023-01-10 12:20:48 +01:00
parent 09c23fd5b8
commit cb8674e68a
2 changed files with 131 additions and 92 deletions

View File

@ -92,7 +92,7 @@ class DNSUtils:
# retrieve ips
ips = set()
saveerr = None
for fam in ((socket.AF_INET,socket.AF_INET6) if DNSUtils.IPv6IsAllowed(True) else (socket.AF_INET,)):
for fam in ((socket.AF_INET,socket.AF_INET6) if DNSUtils.IPv6IsAllowed() else (socket.AF_INET,)):
try:
for result in socket.getaddrinfo(dns, None, fam, 0, socket.IPPROTO_TCP):
# if getaddrinfo returns something unexpected:
@ -188,6 +188,25 @@ class DNSUtils:
DNSUtils.CACHE_ipToName.set(DNSUtils._getSelfNames_key, names)
return names
# key to find cached network interfaces IPs (this tuple-key cannot be used elsewhere):
_getNetIntrfIPs_key = ('netintrf','ips')
@staticmethod
def getNetIntrfIPs():
"""Get own IP addresses of self"""
# to find cached own IPs:
ips = DNSUtils.CACHE_nameToIp.get(DNSUtils._getNetIntrfIPs_key)
if ips is not None:
return ips
# try to obtain from network interfaces if possible (implemented for this platform):
try:
ips = IPAddrSet([a for ni, a in DNSUtils._NetworkInterfacesAddrs()])
except:
ips = IPAddrSet()
# cache and return :
DNSUtils.CACHE_nameToIp.set(DNSUtils._getNetIntrfIPs_key, ips)
return ips
# key to find cached own IPs (this tuple-key cannot be used elsewhere):
_getSelfIPs_key = ('self','ips')
@ -199,14 +218,11 @@ class DNSUtils:
if ips is not None:
return ips
# firstly try to obtain from network interfaces if possible (implemented for this platform):
try:
ips = IPAddrSet([a for ni, a in DNSUtils._NetworkInterfacesAddrs()])
except:
ips = IPAddrSet()
ips = IPAddrSet(DNSUtils.getNetIntrfIPs())
# extend it using different ways (a set with IPs of localhost, hostname, fully qualified):
for hostname in DNSUtils.getSelfNames():
try:
ips |= IPAddrSet(DNSUtils.textToIp(hostname, 'yes'))
ips |= IPAddrSet(DNSUtils.dnsToIp(hostname))
except Exception as e: # pragma: no cover
logSys.warning("Retrieving own IPs of %s failed: %s", hostname, e)
# cache and return :
@ -257,7 +273,7 @@ class DNSUtils:
_IPv6IsAllowed_key = ('self','ipv6-allowed')
@staticmethod
def IPv6IsAllowed(knownOnly=False):
def IPv6IsAllowed():
if DNSUtils._IPv6IsAllowed is not None:
return DNSUtils._IPv6IsAllowed
v = DNSUtils.CACHE_nameToIp.get(DNSUtils._IPv6IsAllowed_key)
@ -265,11 +281,15 @@ class DNSUtils:
return v
v = DNSUtils._IPv6IsSupportedBySystem()
if v is None:
# avoid self recursion (and assume we may have IPv6 during auto-detection):
if knownOnly:
return True
# detect by IPs of host:
v = any((':' in ip.ntoa) for ip in DNSUtils.getSelfIPs())
ips = DNSUtils.getNetIntrfIPs()
if not ips:
DNSUtils._IPv6IsAllowed = True; # avoid self recursion from getSelfIPs -> dnsToIp -> IPv6IsAllowed
try:
ips = DNSUtils.getSelfIPs()
finally:
DNSUtils._IPv6IsAllowed = None
v = any((':' in ip.ntoa) for ip in ips)
DNSUtils.CACHE_nameToIp.set(DNSUtils._IPv6IsAllowed_key, v)
return v
@ -659,7 +679,7 @@ def _NetworkInterfacesAddrs():
# Closure implementing lazy load modules and libc and define _NetworkInterfacesAddrs on demand:
# Currently tested on Linux only (TODO: implement for MacOS, Solaris, etc)
try:
from ctypes import (
Structure, Union, POINTER,
pointer, get_errno, cast,
@ -703,7 +723,9 @@ def _NetworkInterfacesAddrs():
('ifa_ifu', union_ifa_ifu),
('ifa_data', c_void_p),]
libc = ctypes.CDLL(ctypes.util.find_library('c'))
libc = ctypes.CDLL(ctypes.util.find_library('c') or "")
if not libc.getifaddrs: # pragma: no cover
raise NotImplementedError('libc.getifaddrs is not available')
def ifap_iter(ifap):
ifa = ifap.contents
@ -749,6 +771,11 @@ def _NetworkInterfacesAddrs():
finally:
libc.freeifaddrs(ifap)
except Exception as e: # pragma: no cover
_init_error = NotImplementedError(e)
def _NetworkInterfacesAddrs():
raise _init_error
DNSUtils._NetworkInterfacesAddrs = staticmethod(_NetworkInterfacesAddrs);
return _NetworkInterfacesAddrs()

View File

@ -2333,6 +2333,17 @@ class DNSUtilsNetworkTests(unittest.TestCase):
ip1 = IPAddr('93.184.216.34'); ip2 = IPAddr('93.184.216.34'); self.assertEqual(id(ip1), id(ip2))
ip1 = IPAddr('2606:2800:220:1:248:1893:25c8:1946'); ip2 = IPAddr('2606:2800:220:1:248:1893:25c8:1946'); self.assertEqual(id(ip1), id(ip2))
def test_NetworkInterfacesAddrs(self):
try:
ips = IPAddrSet([a for ni, a in DNSUtils._NetworkInterfacesAddrs()])
ip = IPAddr('127.0.0.1')
self.assertEqual(ip in ips, any(ip in n for n in ips))
ip = IPAddr('::1')
self.assertEqual(ip in ips, any(ip in n for n in ips))
except Exception as e: # pragma: no cover
# simply skip if not available, TODO: make coverage platform dependent
raise unittest.SkipTest(e)
def test_IPAddrSet(self):
ips = IPAddrSet([IPAddr('192.0.2.1/27'), IPAddr('2001:DB8::/32')])
self.assertTrue(IPAddr('192.0.2.1') in ips)
@ -2347,7 +2358,7 @@ class DNSUtilsNetworkTests(unittest.TestCase):
if cov == 'dns': # mock-up _NetworkInterfacesAddrs like it's not implemented (raises error)
_org_NetworkInterfacesAddrs = DNSUtils._NetworkInterfacesAddrs
def _tmp_NetworkInterfacesAddrs():
raise NotImplementedError();
raise NotImplementedError()
DNSUtils._NetworkInterfacesAddrs = staticmethod(_tmp_NetworkInterfacesAddrs)
try:
ips = DNSUtils.getSelfIPs()
@ -2364,6 +2375,7 @@ class DNSUtilsNetworkTests(unittest.TestCase):
DNSUtils._NetworkInterfacesAddrs = staticmethod(_org_NetworkInterfacesAddrs)
if cov != 'last':
DNSUtils.CACHE_nameToIp.unset(DNSUtils._getSelfIPs_key)
DNSUtils.CACHE_nameToIp.unset(DNSUtils._getNetIntrfIPs_key)
def testFQDN(self):
unittest.F2B.SkipIfNoNetwork()