mirror of https://github.com/fail2ban/fail2ban
Fix retrieving of IPv6 address with dnsToIp on some systems (default returns AF_INET family only), fix network test-cases.
parent
61c8cd11b8
commit
fe06ffca71
|
@ -64,16 +64,19 @@ class DNSUtils:
|
|||
if ips is not None:
|
||||
return ips
|
||||
# retrieve ips
|
||||
try:
|
||||
ips = list()
|
||||
for result in socket.getaddrinfo(dns, None, 0, 0, socket.IPPROTO_TCP):
|
||||
ip = IPAddr(result[4][0])
|
||||
saveerr = None
|
||||
for fam, ipfam in ((socket.AF_INET, IPAddr.FAM_IPv4), (socket.AF_INET6, IPAddr.FAM_IPv6)):
|
||||
try:
|
||||
for result in socket.getaddrinfo(dns, None, fam, 0, socket.IPPROTO_TCP):
|
||||
ip = IPAddr(result[4][0], ipfam)
|
||||
if ip.isValid:
|
||||
ips.append(ip)
|
||||
except socket.error as e:
|
||||
# todo: make configurable the expired time of cache entry:
|
||||
logSys.warning("Unable to find a corresponding IP address for %s: %s", dns, e)
|
||||
ips = list()
|
||||
saveerr = e
|
||||
if not ips and saveerr:
|
||||
logSys.warning("Unable to find a corresponding IP address for %s: %s", dns, saveerr)
|
||||
|
||||
DNSUtils.CACHE_nameToIp.set(dns, ips)
|
||||
return ips
|
||||
|
||||
|
@ -140,6 +143,8 @@ class IPAddr(object):
|
|||
|
||||
CIDR_RAW = -2
|
||||
CIDR_UNSPEC = -1
|
||||
FAM_IPv4 = CIDR_RAW - socket.AF_INET
|
||||
FAM_IPv6 = CIDR_RAW - socket.AF_INET6
|
||||
|
||||
def __new__(cls, ipstr, cidr=CIDR_UNSPEC):
|
||||
# check already cached as IPAddr
|
||||
|
@ -191,7 +196,11 @@ class IPAddr(object):
|
|||
self._raw = ipstr
|
||||
# if not raw - recognize family, set addr, etc.:
|
||||
if cidr != IPAddr.CIDR_RAW:
|
||||
for family in [socket.AF_INET, socket.AF_INET6]:
|
||||
if cidr is not None and cidr < IPAddr.CIDR_RAW:
|
||||
family = [IPAddr.CIDR_RAW - cidr]
|
||||
else:
|
||||
family = [socket.AF_INET, socket.AF_INET6]
|
||||
for family in family:
|
||||
try:
|
||||
binary = socket.inet_pton(family, ipstr)
|
||||
self._family = family
|
||||
|
|
|
@ -337,6 +337,11 @@ class IgnoreIP(LogCaptureTestCase):
|
|||
for ip in ipList:
|
||||
self.filter.addIgnoreIP(ip)
|
||||
self.assertFalse(self.filter.inIgnoreIPList(ip))
|
||||
if not unittest.F2B.no_network: # pragma: no cover
|
||||
self.assertLogged(
|
||||
'Unable to find a corresponding IP address for 999.999.999.999',
|
||||
'Unable to find a corresponding IP address for abcdef.abcdef',
|
||||
'Unable to find a corresponding IP address for 192.168.0.', all=True)
|
||||
|
||||
def testIgnoreIPCIDR(self):
|
||||
self.filter.addIgnoreIP('192.168.1.0/25')
|
||||
|
|
Loading…
Reference in New Issue