diff --git a/fail2ban/server/ipdns.py b/fail2ban/server/ipdns.py index a5625629..757cceba 100644 --- a/fail2ban/server/ipdns.py +++ b/fail2ban/server/ipdns.py @@ -64,16 +64,19 @@ class DNSUtils: if ips is not None: return ips # retrieve ips - try: - ips = list() - for result in socket.getaddrinfo(dns, None, 0, 0, socket.IPPROTO_TCP): - ip = IPAddr(result[4][0]) - if ip.isValid: - ips.append(ip) - except socket.error as e: - # todo: make configurable the expired time of cache entry: - logSys.warning("Unable to find a corresponding IP address for %s: %s", dns, e) - ips = list() + ips = list() + saveerr = None + for fam, ipfam in ((socket.AF_INET, IPAddr.FAM_IPv4), (socket.AF_INET6, IPAddr.FAM_IPv6)): + try: + for result in socket.getaddrinfo(dns, None, fam, 0, socket.IPPROTO_TCP): + ip = IPAddr(result[4][0], ipfam) + if ip.isValid: + ips.append(ip) + except socket.error as e: + saveerr = e + if not ips and saveerr: + logSys.warning("Unable to find a corresponding IP address for %s: %s", dns, saveerr) + DNSUtils.CACHE_nameToIp.set(dns, ips) return ips @@ -140,6 +143,8 @@ class IPAddr(object): CIDR_RAW = -2 CIDR_UNSPEC = -1 + FAM_IPv4 = CIDR_RAW - socket.AF_INET + FAM_IPv6 = CIDR_RAW - socket.AF_INET6 def __new__(cls, ipstr, cidr=CIDR_UNSPEC): # check already cached as IPAddr @@ -191,7 +196,11 @@ class IPAddr(object): self._raw = ipstr # if not raw - recognize family, set addr, etc.: if cidr != IPAddr.CIDR_RAW: - for family in [socket.AF_INET, socket.AF_INET6]: + if cidr is not None and cidr < IPAddr.CIDR_RAW: + family = [IPAddr.CIDR_RAW - cidr] + else: + family = [socket.AF_INET, socket.AF_INET6] + for family in family: try: binary = socket.inet_pton(family, ipstr) self._family = family diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index 986cf1f0..174152b9 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -337,6 +337,11 @@ class IgnoreIP(LogCaptureTestCase): for ip in ipList: self.filter.addIgnoreIP(ip) self.assertFalse(self.filter.inIgnoreIPList(ip)) + if not unittest.F2B.no_network: # pragma: no cover + self.assertLogged( + 'Unable to find a corresponding IP address for 999.999.999.999', + 'Unable to find a corresponding IP address for abcdef.abcdef', + 'Unable to find a corresponding IP address for 192.168.0.', all=True) def testIgnoreIPCIDR(self): self.filter.addIgnoreIP('192.168.1.0/25')