From 582436aadf310ff2db4d0dc76668c5fa7e5a3c03 Mon Sep 17 00:00:00 2001 From: sebres Date: Wed, 11 Jan 2023 18:27:44 +0100 Subject: [PATCH] don't add subnets to local addresses of `ignoreself` from network interfaces, use only IPs instead (subnets may be too heavy and not wanted, todo: make it configurable later) --- fail2ban/server/ipdns.py | 45 ++++++++++++++++++++++---------- fail2ban/tests/filtertestcase.py | 19 +++++++------- 2 files changed, 41 insertions(+), 23 deletions(-) diff --git a/fail2ban/server/ipdns.py b/fail2ban/server/ipdns.py index 8c5c277e..b435c6df 100644 --- a/fail2ban/server/ipdns.py +++ b/fail2ban/server/ipdns.py @@ -669,13 +669,28 @@ IPAddr.IP6_4COMPAT = IPAddr("::ffff:0:0", 96) class IPAddrSet(set): + hasSubNet = False + + def __init__(self, ips=[]): + ips2 = set() + for ip in ips: + if not isinstance(ip, IPAddr): ip = IPAddr(ip) + ips2.add(ip) + self.hasSubNet |= not ip.isSingle + set.__init__(self, ips2) + + def add(self, ip): + if not isinstance(ip, IPAddr): ip = IPAddr(ip) + self.hasSubNet |= not ip.isSingle + set.add(self, ip) + def __contains__(self, ip): if not isinstance(ip, IPAddr): ip = IPAddr(ip) # IP can be found directly or IP is in each subnet: - return set.__contains__(self, ip) or any(n.contains(ip) for n in self) + return set.__contains__(self, ip) or (self.hasSubNet and any(n.contains(ip) for n in self)) -def _NetworkInterfacesAddrs(): +def _NetworkInterfacesAddrs(withMask=False): # Closure implementing lazy load modules and libc and define _NetworkInterfacesAddrs on demand: # Currently tested on Linux only (TODO: implement for MacOS, Solaris, etc) @@ -735,28 +750,30 @@ def _NetworkInterfacesAddrs(): break ifa = ifa.ifa_next.contents - def getfamaddr(ifa): + def getfamaddr(ifa, withMask=False): sa = ifa.ifa_addr.contents fam = sa.sa_family if fam == socket.AF_INET: sa = cast(pointer(sa), POINTER(struct_sockaddr_in)).contents addr = socket.inet_ntop(fam, sa.sin_addr) - nm = ifa.ifa_netmask.contents - if nm is not None and nm.sa_family == socket.AF_INET: - nm = cast(pointer(nm), POINTER(struct_sockaddr_in)).contents - addr += '/'+socket.inet_ntop(fam, nm.sin_addr) + if withMask: + nm = ifa.ifa_netmask.contents + if nm is not None and nm.sa_family == socket.AF_INET: + nm = cast(pointer(nm), POINTER(struct_sockaddr_in)).contents + addr += '/'+socket.inet_ntop(fam, nm.sin_addr) return IPAddr(addr) elif fam == socket.AF_INET6: sa = cast(pointer(sa), POINTER(struct_sockaddr_in6)).contents addr = socket.inet_ntop(fam, sa.sin6_addr) - nm = ifa.ifa_netmask.contents - if nm is not None and nm.sa_family == socket.AF_INET6: - nm = cast(pointer(nm), POINTER(struct_sockaddr_in6)).contents - addr += '/'+socket.inet_ntop(fam, nm.sin6_addr) + if withMask: + nm = ifa.ifa_netmask.contents + if nm is not None and nm.sa_family == socket.AF_INET6: + nm = cast(pointer(nm), POINTER(struct_sockaddr_in6)).contents + addr += '/'+socket.inet_ntop(fam, nm.sin6_addr) return IPAddr(addr) return None - def _NetworkInterfacesAddrs(): + def _NetworkInterfacesAddrs(withMask=False): ifap = POINTER(struct_ifaddrs)() result = libc.getifaddrs(pointer(ifap)) if result != 0: @@ -765,7 +782,7 @@ def _NetworkInterfacesAddrs(): try: for ifa in ifap_iter(ifap): name = ifa.ifa_name.decode("UTF-8") - addr = getfamaddr(ifa) + addr = getfamaddr(ifa, withMask) if addr: yield name, addr finally: @@ -777,6 +794,6 @@ def _NetworkInterfacesAddrs(): raise _init_error DNSUtils._NetworkInterfacesAddrs = staticmethod(_NetworkInterfacesAddrs); - return _NetworkInterfacesAddrs() + return _NetworkInterfacesAddrs(withMask) DNSUtils._NetworkInterfacesAddrs = staticmethod(_NetworkInterfacesAddrs); diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index 9f96c190..4e308e38 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -2334,15 +2334,16 @@ class DNSUtilsNetworkTests(unittest.TestCase): ip1 = IPAddr('2606:2800:220:1:248:1893:25c8:1946'); ip2 = IPAddr('2606:2800:220:1:248:1893:25c8:1946'); self.assertEqual(id(ip1), id(ip2)) def test_NetworkInterfacesAddrs(self): - try: - ips = IPAddrSet([a for ni, a in DNSUtils._NetworkInterfacesAddrs()]) - ip = IPAddr('127.0.0.1') - self.assertEqual(ip in ips, any(ip in n for n in ips)) - ip = IPAddr('::1') - self.assertEqual(ip in ips, any(ip in n for n in ips)) - except Exception as e: # pragma: no cover - # simply skip if not available, TODO: make coverage platform dependent - raise unittest.SkipTest(e) + for withMask in (False, True): + try: + ips = IPAddrSet([a for ni, a in DNSUtils._NetworkInterfacesAddrs(withMask)]) + ip = IPAddr('127.0.0.1') + self.assertEqual(ip in ips, any(ip in n for n in ips)) + ip = IPAddr('::1') + self.assertEqual(ip in ips, any(ip in n for n in ips)) + except Exception as e: # pragma: no cover + # simply skip if not available, TODO: make coverage platform dependent + raise unittest.SkipTest(e) def test_IPAddrSet(self): ips = IPAddrSet([IPAddr('192.0.2.1/27'), IPAddr('2001:DB8::/32')])