Merge branch 'fix-gh-3438':

* circumvent SEGFAULT in a python's socket module by getaddrinfo with disabled IPv6 (gh-3438)
* improve auto-detection of IPv6 support (`allowipv6 = auto` by default)
* improve `ignoreself` by considering all local addresses from network interfaces additionally to IPs from hostnames (gh-3132)
pull/3460/head
sebres 2023-01-11 18:41:15 +01:00
commit a3a3fffa54
3 changed files with 279 additions and 26 deletions

View File

@ -11,8 +11,13 @@ ver. 1.0.3-dev-1 (20??/??/??) - development nightly edition
-----------
### Fixes
* circumvent SEGFAULT in a python's socket module by getaddrinfo with disabled IPv6 (gh-3438)
### New Features and Enhancements
* better auto-detection for IPv6 support (`allowipv6 = auto` by default), trying to check sysctl net.ipv6.conf.all.disable_ipv6
(value read from `/proc/sys/net/ipv6/conf/all/disable_ipv6`) if available, otherwise seeks over local IPv6 from network interfaces
if available for platform and uses DNS to find local IPv6 as a fallback only
* improve `ignoreself` by considering all local addresses from network interfaces additionally to IPs from hostnames (gh-3132)
ver. 1.0.2 (2022/11/09) - finally-war-game-test-tape-not-a-nuclear-alarm

View File

@ -92,14 +92,14 @@ class DNSUtils:
# retrieve ips
ips = set()
saveerr = None
for fam, ipfam in ((socket.AF_INET, IPAddr.FAM_IPv4), (socket.AF_INET6, IPAddr.FAM_IPv6)):
for fam in ((socket.AF_INET,socket.AF_INET6) if DNSUtils.IPv6IsAllowed() else (socket.AF_INET,)):
try:
for result in socket.getaddrinfo(dns, None, fam, 0, socket.IPPROTO_TCP):
# if getaddrinfo returns something unexpected:
if len(result) < 4 or not len(result[4]): continue
# get ip from `(2, 1, 6, '', ('127.0.0.1', 0))`,be sure we've an ip-string
# (some python-versions resp. host configurations causes returning of integer there):
ip = IPAddr(str(result[4][0]), ipfam)
ip = IPAddr(str(result[4][0]), IPAddr._AF2FAM(fam))
if ip.isValid:
ips.add(ip)
except Exception as e:
@ -154,17 +154,18 @@ class DNSUtils:
# try find cached own hostnames (this tuple-key cannot be used elsewhere):
key = ('self','hostname', fqdn)
name = DNSUtils.CACHE_ipToName.get(key)
if name is not None:
return name
# get it using different ways (hostname, fully-qualified or vice versa):
if name is None:
name = ''
for hostname in (
(getfqdn, socket.gethostname) if fqdn else (socket.gethostname, getfqdn)
):
try:
name = hostname()
break
except Exception as e: # pragma: no cover
logSys.warning("Retrieving own hostnames failed: %s", e)
name = ''
for hostname in (
(getfqdn, socket.gethostname) if fqdn else (socket.gethostname, getfqdn)
):
try:
name = hostname()
break
except Exception as e: # pragma: no cover
logSys.warning("Retrieving own hostnames failed: %s", e)
# cache and return :
DNSUtils.CACHE_ipToName.set(key, name)
return name
@ -177,15 +178,35 @@ class DNSUtils:
"""Get own host names of self"""
# try find cached own hostnames:
names = DNSUtils.CACHE_ipToName.get(DNSUtils._getSelfNames_key)
if names is not None:
return names
# get it using different ways (a set with names of localhost, hostname, fully qualified):
if names is None:
names = set([
'localhost', DNSUtils.getHostname(False), DNSUtils.getHostname(True)
]) - set(['']) # getHostname can return ''
names = set([
'localhost', DNSUtils.getHostname(False), DNSUtils.getHostname(True)
]) - set(['']) # getHostname can return ''
# cache and return :
DNSUtils.CACHE_ipToName.set(DNSUtils._getSelfNames_key, names)
return names
# key to find cached network interfaces IPs (this tuple-key cannot be used elsewhere):
_getNetIntrfIPs_key = ('netintrf','ips')
@staticmethod
def getNetIntrfIPs():
"""Get own IP addresses of self"""
# to find cached own IPs:
ips = DNSUtils.CACHE_nameToIp.get(DNSUtils._getNetIntrfIPs_key)
if ips is not None:
return ips
# try to obtain from network interfaces if possible (implemented for this platform):
try:
ips = IPAddrSet([a for ni, a in DNSUtils._NetworkInterfacesAddrs()])
except:
ips = IPAddrSet()
# cache and return :
DNSUtils.CACHE_nameToIp.set(DNSUtils._getNetIntrfIPs_key, ips)
return ips
# key to find cached own IPs (this tuple-key cannot be used elsewhere):
_getSelfIPs_key = ('self','ips')
@ -194,20 +215,54 @@ class DNSUtils:
"""Get own IP addresses of self"""
# to find cached own IPs:
ips = DNSUtils.CACHE_nameToIp.get(DNSUtils._getSelfIPs_key)
# get it using different ways (a set with IPs of localhost, hostname, fully qualified):
if ips is None:
ips = set()
for hostname in DNSUtils.getSelfNames():
try:
ips |= set(DNSUtils.textToIp(hostname, 'yes'))
except Exception as e: # pragma: no cover
logSys.warning("Retrieving own IPs of %s failed: %s", hostname, e)
if ips is not None:
return ips
# firstly try to obtain from network interfaces if possible (implemented for this platform):
ips = IPAddrSet(DNSUtils.getNetIntrfIPs())
# extend it using different ways (a set with IPs of localhost, hostname, fully qualified):
for hostname in DNSUtils.getSelfNames():
try:
ips |= IPAddrSet(DNSUtils.dnsToIp(hostname))
except Exception as e: # pragma: no cover
logSys.warning("Retrieving own IPs of %s failed: %s", hostname, e)
# cache and return :
DNSUtils.CACHE_nameToIp.set(DNSUtils._getSelfIPs_key, ips)
return ips
_IPv6IsAllowed = None
@staticmethod
def _IPv6IsSupportedBySystem():
if not socket.has_ipv6:
return False
# try to check sysctl net.ipv6.conf.all.disable_ipv6:
try:
with open('/proc/sys/net/ipv6/conf/all/disable_ipv6', 'rb') as f:
# if 1 - disabled, 0 - enabled
return not int(f.read())
except:
pass
s = None
try:
# try to create INET6 socket:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
# bind it to free port for any interface supporting IPv6:
s.bind(("", 0));
return True
except Exception as e: # pragma: no cover
if hasattr(e, 'errno'):
import errno
# negative (-9 'Address family not supported', etc) or not available/supported:
if e.errno < 0 or e.errno in (errno.EADDRNOTAVAIL, errno.EAFNOSUPPORT):
return False
# in use:
if e.errno in (errno.EADDRINUSE, errno.EACCES): # normally unreachable (free port and root)
return True
finally:
if s: s.close()
# unable to detect:
return None
@staticmethod
def setIPv6IsAllowed(value):
DNSUtils._IPv6IsAllowed = value
@ -224,7 +279,17 @@ class DNSUtils:
v = DNSUtils.CACHE_nameToIp.get(DNSUtils._IPv6IsAllowed_key)
if v is not None:
return v
v = any((':' in ip.ntoa) for ip in DNSUtils.getSelfIPs())
v = DNSUtils._IPv6IsSupportedBySystem()
if v is None:
# detect by IPs of host:
ips = DNSUtils.getNetIntrfIPs()
if not ips:
DNSUtils._IPv6IsAllowed = True; # avoid self recursion from getSelfIPs -> dnsToIp -> IPv6IsAllowed
try:
ips = DNSUtils.getSelfIPs()
finally:
DNSUtils._IPv6IsAllowed = None
v = any((':' in ip.ntoa) for ip in ips)
DNSUtils.CACHE_nameToIp.set(DNSUtils._IPv6IsAllowed_key, v)
return v
@ -255,6 +320,9 @@ class IPAddr(object):
CIDR_UNSPEC = -1
FAM_IPv4 = CIDR_RAW - socket.AF_INET
FAM_IPv6 = CIDR_RAW - socket.AF_INET6
@staticmethod
def _AF2FAM(v):
return IPAddr.CIDR_RAW - v
def __new__(cls, ipstr, cidr=CIDR_UNSPEC):
if cidr == IPAddr.CIDR_UNSPEC and isinstance(ipstr, (tuple, list)):
@ -545,6 +613,9 @@ class IPAddr(object):
"""
return isinstance(ip, IPAddr) and (ip == self or ip.isInNet(self))
def __contains__(self, ip):
return self.contains(ip)
# Pre-calculated map: addr to maskplen
def __getMaskMap():
m6 = (1 << 128)-1
@ -594,3 +665,135 @@ class IPAddr(object):
# An IPv4 compatible IPv6 to be reused
IPAddr.IP6_4COMPAT = IPAddr("::ffff:0:0", 96)
class IPAddrSet(set):
hasSubNet = False
def __init__(self, ips=[]):
ips2 = set()
for ip in ips:
if not isinstance(ip, IPAddr): ip = IPAddr(ip)
ips2.add(ip)
self.hasSubNet |= not ip.isSingle
set.__init__(self, ips2)
def add(self, ip):
if not isinstance(ip, IPAddr): ip = IPAddr(ip)
self.hasSubNet |= not ip.isSingle
set.add(self, ip)
def __contains__(self, ip):
if not isinstance(ip, IPAddr): ip = IPAddr(ip)
# IP can be found directly or IP is in each subnet:
return set.__contains__(self, ip) or (self.hasSubNet and any(n.contains(ip) for n in self))
def _NetworkInterfacesAddrs(withMask=False):
# Closure implementing lazy load modules and libc and define _NetworkInterfacesAddrs on demand:
# Currently tested on Linux only (TODO: implement for MacOS, Solaris, etc)
try:
from ctypes import (
Structure, Union, POINTER,
pointer, get_errno, cast,
c_ushort, c_byte, c_void_p, c_char_p, c_uint, c_int, c_uint16, c_uint32
)
import ctypes.util
import ctypes
class struct_sockaddr(Structure):
_fields_ = [
('sa_family', c_ushort),
('sa_data', c_byte * 14),]
class struct_sockaddr_in(Structure):
_fields_ = [
('sin_family', c_ushort),
('sin_port', c_uint16),
('sin_addr', c_byte * 4)]
class struct_sockaddr_in6(Structure):
_fields_ = [
('sin6_family', c_ushort),
('sin6_port', c_uint16),
('sin6_flowinfo', c_uint32),
('sin6_addr', c_byte * 16),
('sin6_scope_id', c_uint32)]
class union_ifa_ifu(Union):
_fields_ = [
('ifu_broadaddr', POINTER(struct_sockaddr)),
('ifu_dstaddr', POINTER(struct_sockaddr)),]
class struct_ifaddrs(Structure):
pass
struct_ifaddrs._fields_ = [
('ifa_next', POINTER(struct_ifaddrs)),
('ifa_name', c_char_p),
('ifa_flags', c_uint),
('ifa_addr', POINTER(struct_sockaddr)),
('ifa_netmask', POINTER(struct_sockaddr)),
('ifa_ifu', union_ifa_ifu),
('ifa_data', c_void_p),]
libc = ctypes.CDLL(ctypes.util.find_library('c') or "")
if not libc.getifaddrs: # pragma: no cover
raise NotImplementedError('libc.getifaddrs is not available')
def ifap_iter(ifap):
ifa = ifap.contents
while True:
yield ifa
if not ifa.ifa_next:
break
ifa = ifa.ifa_next.contents
def getfamaddr(ifa, withMask=False):
sa = ifa.ifa_addr.contents
fam = sa.sa_family
if fam == socket.AF_INET:
sa = cast(pointer(sa), POINTER(struct_sockaddr_in)).contents
addr = socket.inet_ntop(fam, sa.sin_addr)
if withMask:
nm = ifa.ifa_netmask.contents
if nm is not None and nm.sa_family == socket.AF_INET:
nm = cast(pointer(nm), POINTER(struct_sockaddr_in)).contents
addr += '/'+socket.inet_ntop(fam, nm.sin_addr)
return IPAddr(addr)
elif fam == socket.AF_INET6:
sa = cast(pointer(sa), POINTER(struct_sockaddr_in6)).contents
addr = socket.inet_ntop(fam, sa.sin6_addr)
if withMask:
nm = ifa.ifa_netmask.contents
if nm is not None and nm.sa_family == socket.AF_INET6:
nm = cast(pointer(nm), POINTER(struct_sockaddr_in6)).contents
addr += '/'+socket.inet_ntop(fam, nm.sin6_addr)
return IPAddr(addr)
return None
def _NetworkInterfacesAddrs(withMask=False):
ifap = POINTER(struct_ifaddrs)()
result = libc.getifaddrs(pointer(ifap))
if result != 0:
raise OSError(get_errno())
del result
try:
for ifa in ifap_iter(ifap):
name = ifa.ifa_name.decode("UTF-8")
addr = getfamaddr(ifa, withMask)
if addr:
yield name, addr
finally:
libc.freeifaddrs(ifap)
except Exception as e: # pragma: no cover
_init_error = NotImplementedError(e)
def _NetworkInterfacesAddrs():
raise _init_error
DNSUtils._NetworkInterfacesAddrs = staticmethod(_NetworkInterfacesAddrs);
return _NetworkInterfacesAddrs(withMask)
DNSUtils._NetworkInterfacesAddrs = staticmethod(_NetworkInterfacesAddrs);

View File

@ -40,7 +40,7 @@ from ..server.jail import Jail
from ..server.filterpoll import FilterPoll
from ..server.filter import FailTicket, Filter, FileFilter, FileContainer
from ..server.failmanager import FailManagerEmpty
from ..server.ipdns import asip, getfqdn, DNSUtils, IPAddr
from ..server.ipdns import asip, getfqdn, DNSUtils, IPAddr, IPAddrSet
from ..server.mytime import MyTime
from ..server.utils import Utils, uni_decode
from .databasetestcase import getFail2BanDb
@ -2333,6 +2333,51 @@ class DNSUtilsNetworkTests(unittest.TestCase):
ip1 = IPAddr('93.184.216.34'); ip2 = IPAddr('93.184.216.34'); self.assertEqual(id(ip1), id(ip2))
ip1 = IPAddr('2606:2800:220:1:248:1893:25c8:1946'); ip2 = IPAddr('2606:2800:220:1:248:1893:25c8:1946'); self.assertEqual(id(ip1), id(ip2))
def test_NetworkInterfacesAddrs(self):
for withMask in (False, True):
try:
ips = IPAddrSet([a for ni, a in DNSUtils._NetworkInterfacesAddrs(withMask)])
ip = IPAddr('127.0.0.1')
self.assertEqual(ip in ips, any(ip in n for n in ips))
ip = IPAddr('::1')
self.assertEqual(ip in ips, any(ip in n for n in ips))
except Exception as e: # pragma: no cover
# simply skip if not available, TODO: make coverage platform dependent
raise unittest.SkipTest(e)
def test_IPAddrSet(self):
ips = IPAddrSet([IPAddr('192.0.2.1/27'), IPAddr('2001:DB8::/32')])
self.assertTrue(IPAddr('192.0.2.1') in ips)
self.assertTrue(IPAddr('192.0.2.31') in ips)
self.assertFalse(IPAddr('192.0.2.32') in ips)
self.assertTrue(IPAddr('2001:DB8::1') in ips)
self.assertTrue(IPAddr('2001:0DB8:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF') in ips)
self.assertFalse(IPAddr('2001:DB9::') in ips)
# self IPs must be a set too (cover different mechanisms to obtain own IPs):
for cov in ('ni', 'dns', 'last'):
_org_NetworkInterfacesAddrs = None
if cov == 'dns': # mock-up _NetworkInterfacesAddrs like it's not implemented (raises error)
_org_NetworkInterfacesAddrs = DNSUtils._NetworkInterfacesAddrs
def _tmp_NetworkInterfacesAddrs():
raise NotImplementedError()
DNSUtils._NetworkInterfacesAddrs = staticmethod(_tmp_NetworkInterfacesAddrs)
try:
ips = DNSUtils.getSelfIPs()
# print('*****', ips)
if ips:
ip = IPAddr('127.0.0.1')
self.assertEqual(ip in ips, any(ip in n for n in ips))
ip = IPAddr('127.0.0.2')
self.assertEqual(ip in ips, any(ip in n for n in ips))
ip = IPAddr('::1')
self.assertEqual(ip in ips, any(ip in n for n in ips))
finally:
if _org_NetworkInterfacesAddrs:
DNSUtils._NetworkInterfacesAddrs = staticmethod(_org_NetworkInterfacesAddrs)
if cov != 'last':
DNSUtils.CACHE_nameToIp.unset(DNSUtils._getSelfIPs_key)
DNSUtils.CACHE_nameToIp.unset(DNSUtils._getNetIntrfIPs_key)
def testFQDN(self):
unittest.F2B.SkipIfNoNetwork()
sname = DNSUtils.getHostname(fqdn=False)