diff --git a/fail2ban/server/ipdns.py b/fail2ban/server/ipdns.py index 75e21a31..8c5c277e 100644 --- a/fail2ban/server/ipdns.py +++ b/fail2ban/server/ipdns.py @@ -92,7 +92,7 @@ class DNSUtils: # retrieve ips ips = set() saveerr = None - for fam in ((socket.AF_INET,socket.AF_INET6) if DNSUtils.IPv6IsAllowed(True) else (socket.AF_INET,)): + for fam in ((socket.AF_INET,socket.AF_INET6) if DNSUtils.IPv6IsAllowed() else (socket.AF_INET,)): try: for result in socket.getaddrinfo(dns, None, fam, 0, socket.IPPROTO_TCP): # if getaddrinfo returns something unexpected: @@ -188,6 +188,25 @@ class DNSUtils: DNSUtils.CACHE_ipToName.set(DNSUtils._getSelfNames_key, names) return names + # key to find cached network interfaces IPs (this tuple-key cannot be used elsewhere): + _getNetIntrfIPs_key = ('netintrf','ips') + + @staticmethod + def getNetIntrfIPs(): + """Get own IP addresses of self""" + # to find cached own IPs: + ips = DNSUtils.CACHE_nameToIp.get(DNSUtils._getNetIntrfIPs_key) + if ips is not None: + return ips + # try to obtain from network interfaces if possible (implemented for this platform): + try: + ips = IPAddrSet([a for ni, a in DNSUtils._NetworkInterfacesAddrs()]) + except: + ips = IPAddrSet() + # cache and return : + DNSUtils.CACHE_nameToIp.set(DNSUtils._getNetIntrfIPs_key, ips) + return ips + # key to find cached own IPs (this tuple-key cannot be used elsewhere): _getSelfIPs_key = ('self','ips') @@ -199,14 +218,11 @@ class DNSUtils: if ips is not None: return ips # firstly try to obtain from network interfaces if possible (implemented for this platform): - try: - ips = IPAddrSet([a for ni, a in DNSUtils._NetworkInterfacesAddrs()]) - except: - ips = IPAddrSet() + ips = IPAddrSet(DNSUtils.getNetIntrfIPs()) # extend it using different ways (a set with IPs of localhost, hostname, fully qualified): for hostname in DNSUtils.getSelfNames(): try: - ips |= IPAddrSet(DNSUtils.textToIp(hostname, 'yes')) + ips |= IPAddrSet(DNSUtils.dnsToIp(hostname)) except Exception as e: # pragma: no cover logSys.warning("Retrieving own IPs of %s failed: %s", hostname, e) # cache and return : @@ -257,7 +273,7 @@ class DNSUtils: _IPv6IsAllowed_key = ('self','ipv6-allowed') @staticmethod - def IPv6IsAllowed(knownOnly=False): + def IPv6IsAllowed(): if DNSUtils._IPv6IsAllowed is not None: return DNSUtils._IPv6IsAllowed v = DNSUtils.CACHE_nameToIp.get(DNSUtils._IPv6IsAllowed_key) @@ -265,11 +281,15 @@ class DNSUtils: return v v = DNSUtils._IPv6IsSupportedBySystem() if v is None: - # avoid self recursion (and assume we may have IPv6 during auto-detection): - if knownOnly: - return True # detect by IPs of host: - v = any((':' in ip.ntoa) for ip in DNSUtils.getSelfIPs()) + ips = DNSUtils.getNetIntrfIPs() + if not ips: + DNSUtils._IPv6IsAllowed = True; # avoid self recursion from getSelfIPs -> dnsToIp -> IPv6IsAllowed + try: + ips = DNSUtils.getSelfIPs() + finally: + DNSUtils._IPv6IsAllowed = None + v = any((':' in ip.ntoa) for ip in ips) DNSUtils.CACHE_nameToIp.set(DNSUtils._IPv6IsAllowed_key, v) return v @@ -659,95 +679,102 @@ def _NetworkInterfacesAddrs(): # Closure implementing lazy load modules and libc and define _NetworkInterfacesAddrs on demand: # Currently tested on Linux only (TODO: implement for MacOS, Solaris, etc) + try: + from ctypes import ( + Structure, Union, POINTER, + pointer, get_errno, cast, + c_ushort, c_byte, c_void_p, c_char_p, c_uint, c_int, c_uint16, c_uint32 + ) + import ctypes.util + import ctypes - from ctypes import ( - Structure, Union, POINTER, - pointer, get_errno, cast, - c_ushort, c_byte, c_void_p, c_char_p, c_uint, c_int, c_uint16, c_uint32 - ) - import ctypes.util - import ctypes + class struct_sockaddr(Structure): + _fields_ = [ + ('sa_family', c_ushort), + ('sa_data', c_byte * 14),] - class struct_sockaddr(Structure): - _fields_ = [ - ('sa_family', c_ushort), - ('sa_data', c_byte * 14),] + class struct_sockaddr_in(Structure): + _fields_ = [ + ('sin_family', c_ushort), + ('sin_port', c_uint16), + ('sin_addr', c_byte * 4)] - class struct_sockaddr_in(Structure): - _fields_ = [ - ('sin_family', c_ushort), - ('sin_port', c_uint16), - ('sin_addr', c_byte * 4)] + class struct_sockaddr_in6(Structure): + _fields_ = [ + ('sin6_family', c_ushort), + ('sin6_port', c_uint16), + ('sin6_flowinfo', c_uint32), + ('sin6_addr', c_byte * 16), + ('sin6_scope_id', c_uint32)] - class struct_sockaddr_in6(Structure): - _fields_ = [ - ('sin6_family', c_ushort), - ('sin6_port', c_uint16), - ('sin6_flowinfo', c_uint32), - ('sin6_addr', c_byte * 16), - ('sin6_scope_id', c_uint32)] + class union_ifa_ifu(Union): + _fields_ = [ + ('ifu_broadaddr', POINTER(struct_sockaddr)), + ('ifu_dstaddr', POINTER(struct_sockaddr)),] - class union_ifa_ifu(Union): - _fields_ = [ - ('ifu_broadaddr', POINTER(struct_sockaddr)), - ('ifu_dstaddr', POINTER(struct_sockaddr)),] + class struct_ifaddrs(Structure): + pass + struct_ifaddrs._fields_ = [ + ('ifa_next', POINTER(struct_ifaddrs)), + ('ifa_name', c_char_p), + ('ifa_flags', c_uint), + ('ifa_addr', POINTER(struct_sockaddr)), + ('ifa_netmask', POINTER(struct_sockaddr)), + ('ifa_ifu', union_ifa_ifu), + ('ifa_data', c_void_p),] - class struct_ifaddrs(Structure): - pass - struct_ifaddrs._fields_ = [ - ('ifa_next', POINTER(struct_ifaddrs)), - ('ifa_name', c_char_p), - ('ifa_flags', c_uint), - ('ifa_addr', POINTER(struct_sockaddr)), - ('ifa_netmask', POINTER(struct_sockaddr)), - ('ifa_ifu', union_ifa_ifu), - ('ifa_data', c_void_p),] + libc = ctypes.CDLL(ctypes.util.find_library('c') or "") + if not libc.getifaddrs: # pragma: no cover + raise NotImplementedError('libc.getifaddrs is not available') - libc = ctypes.CDLL(ctypes.util.find_library('c')) + def ifap_iter(ifap): + ifa = ifap.contents + while True: + yield ifa + if not ifa.ifa_next: + break + ifa = ifa.ifa_next.contents - def ifap_iter(ifap): - ifa = ifap.contents - while True: - yield ifa - if not ifa.ifa_next: - break - ifa = ifa.ifa_next.contents + def getfamaddr(ifa): + sa = ifa.ifa_addr.contents + fam = sa.sa_family + if fam == socket.AF_INET: + sa = cast(pointer(sa), POINTER(struct_sockaddr_in)).contents + addr = socket.inet_ntop(fam, sa.sin_addr) + nm = ifa.ifa_netmask.contents + if nm is not None and nm.sa_family == socket.AF_INET: + nm = cast(pointer(nm), POINTER(struct_sockaddr_in)).contents + addr += '/'+socket.inet_ntop(fam, nm.sin_addr) + return IPAddr(addr) + elif fam == socket.AF_INET6: + sa = cast(pointer(sa), POINTER(struct_sockaddr_in6)).contents + addr = socket.inet_ntop(fam, sa.sin6_addr) + nm = ifa.ifa_netmask.contents + if nm is not None and nm.sa_family == socket.AF_INET6: + nm = cast(pointer(nm), POINTER(struct_sockaddr_in6)).contents + addr += '/'+socket.inet_ntop(fam, nm.sin6_addr) + return IPAddr(addr) + return None - def getfamaddr(ifa): - sa = ifa.ifa_addr.contents - fam = sa.sa_family - if fam == socket.AF_INET: - sa = cast(pointer(sa), POINTER(struct_sockaddr_in)).contents - addr = socket.inet_ntop(fam, sa.sin_addr) - nm = ifa.ifa_netmask.contents - if nm is not None and nm.sa_family == socket.AF_INET: - nm = cast(pointer(nm), POINTER(struct_sockaddr_in)).contents - addr += '/'+socket.inet_ntop(fam, nm.sin_addr) - return IPAddr(addr) - elif fam == socket.AF_INET6: - sa = cast(pointer(sa), POINTER(struct_sockaddr_in6)).contents - addr = socket.inet_ntop(fam, sa.sin6_addr) - nm = ifa.ifa_netmask.contents - if nm is not None and nm.sa_family == socket.AF_INET6: - nm = cast(pointer(nm), POINTER(struct_sockaddr_in6)).contents - addr += '/'+socket.inet_ntop(fam, nm.sin6_addr) - return IPAddr(addr) - return None - - def _NetworkInterfacesAddrs(): - ifap = POINTER(struct_ifaddrs)() - result = libc.getifaddrs(pointer(ifap)) - if result != 0: - raise OSError(get_errno()) - del result - try: - for ifa in ifap_iter(ifap): - name = ifa.ifa_name.decode("UTF-8") - addr = getfamaddr(ifa) - if addr: - yield name, addr - finally: - libc.freeifaddrs(ifap) + def _NetworkInterfacesAddrs(): + ifap = POINTER(struct_ifaddrs)() + result = libc.getifaddrs(pointer(ifap)) + if result != 0: + raise OSError(get_errno()) + del result + try: + for ifa in ifap_iter(ifap): + name = ifa.ifa_name.decode("UTF-8") + addr = getfamaddr(ifa) + if addr: + yield name, addr + finally: + libc.freeifaddrs(ifap) + + except Exception as e: # pragma: no cover + _init_error = NotImplementedError(e) + def _NetworkInterfacesAddrs(): + raise _init_error DNSUtils._NetworkInterfacesAddrs = staticmethod(_NetworkInterfacesAddrs); return _NetworkInterfacesAddrs() diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index 24f5272e..9f96c190 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -2333,6 +2333,17 @@ class DNSUtilsNetworkTests(unittest.TestCase): ip1 = IPAddr('93.184.216.34'); ip2 = IPAddr('93.184.216.34'); self.assertEqual(id(ip1), id(ip2)) ip1 = IPAddr('2606:2800:220:1:248:1893:25c8:1946'); ip2 = IPAddr('2606:2800:220:1:248:1893:25c8:1946'); self.assertEqual(id(ip1), id(ip2)) + def test_NetworkInterfacesAddrs(self): + try: + ips = IPAddrSet([a for ni, a in DNSUtils._NetworkInterfacesAddrs()]) + ip = IPAddr('127.0.0.1') + self.assertEqual(ip in ips, any(ip in n for n in ips)) + ip = IPAddr('::1') + self.assertEqual(ip in ips, any(ip in n for n in ips)) + except Exception as e: # pragma: no cover + # simply skip if not available, TODO: make coverage platform dependent + raise unittest.SkipTest(e) + def test_IPAddrSet(self): ips = IPAddrSet([IPAddr('192.0.2.1/27'), IPAddr('2001:DB8::/32')]) self.assertTrue(IPAddr('192.0.2.1') in ips) @@ -2347,7 +2358,7 @@ class DNSUtilsNetworkTests(unittest.TestCase): if cov == 'dns': # mock-up _NetworkInterfacesAddrs like it's not implemented (raises error) _org_NetworkInterfacesAddrs = DNSUtils._NetworkInterfacesAddrs def _tmp_NetworkInterfacesAddrs(): - raise NotImplementedError(); + raise NotImplementedError() DNSUtils._NetworkInterfacesAddrs = staticmethod(_tmp_NetworkInterfacesAddrs) try: ips = DNSUtils.getSelfIPs() @@ -2364,6 +2375,7 @@ class DNSUtilsNetworkTests(unittest.TestCase): DNSUtils._NetworkInterfacesAddrs = staticmethod(_org_NetworkInterfacesAddrs) if cov != 'last': DNSUtils.CACHE_nameToIp.unset(DNSUtils._getSelfIPs_key) + DNSUtils.CACHE_nameToIp.unset(DNSUtils._getNetIntrfIPs_key) def testFQDN(self): unittest.F2B.SkipIfNoNetwork()