mirror of https://github.com/fail2ban/fail2ban
ENH: made some code more pythonic, removed not needed groupping in IP_CRE6, more tests
parent
1a19fd8ee0
commit
f49067bb35
|
@ -582,8 +582,8 @@ import socket, struct
|
|||
|
||||
class DNSUtils:
|
||||
|
||||
IP_CRE = re.compile("^(?:\d{1,3}\.){3}\d{1,3}$")
|
||||
IP_CRE6 = re.compile("^(?:[0-9:A-Fa-f]{3,})$")
|
||||
IPv4_CRE = re.compile("^(?:\d{1,3}\.){3}\d{1,3}$")
|
||||
IPv6_CRE = re.compile("^[0-9:A-Fa-f]{3,}$")
|
||||
|
||||
#@staticmethod
|
||||
def dnsToIp(dns):
|
||||
|
@ -600,20 +600,15 @@ class DNSUtils:
|
|||
|
||||
#@staticmethod
|
||||
def searchIP(text):
|
||||
""" Search if an IP address if directly available and return
|
||||
""" Search if an IP address is directly available and return
|
||||
it.
|
||||
"""
|
||||
match = DNSUtils.IP_CRE.match(text)
|
||||
if match:
|
||||
return match
|
||||
else:
|
||||
match = DNSUtils.IP_CRE6.match(text)
|
||||
if match:
|
||||
""" Right Here, we faced to a ipv6
|
||||
"""
|
||||
return match
|
||||
else:
|
||||
return None
|
||||
match = DNSUtils.IPv4_CRE.match(text) \
|
||||
or DNSUtils.IPv6_CRE.match(text)
|
||||
# return None if no match (so and fails right away)
|
||||
# or actual match (string) otherwise
|
||||
return match and match.group(0)
|
||||
|
||||
searchIP = staticmethod(searchIP)
|
||||
|
||||
#@staticmethod
|
||||
|
@ -623,38 +618,34 @@ class DNSUtils:
|
|||
# try to convert to ipv4
|
||||
try:
|
||||
socket.inet_aton(s[0])
|
||||
return True
|
||||
except socket.error:
|
||||
# if it had failed try to convert ipv6
|
||||
try:
|
||||
socket.inet_pton(socket.AF_INET6, s[0])
|
||||
return True
|
||||
except socket.error:
|
||||
# not a valid address in both stacks
|
||||
return False
|
||||
return True
|
||||
|
||||
isValidIP = staticmethod(isValidIP)
|
||||
|
||||
#@staticmethod
|
||||
def textToIp(text, useDns):
|
||||
""" Return the IP of DNS found in a given text.
|
||||
def textToIp(text, useDns='no'):
|
||||
""" Converts text to an IP optionally performing DNS resolution.
|
||||
"""
|
||||
ipList = list()
|
||||
ipList = []
|
||||
# Search for plain IP
|
||||
plainIP = DNSUtils.searchIP(text)
|
||||
if not plainIP is None:
|
||||
plainIPStr = plainIP.group(0)
|
||||
if DNSUtils.isValidIP(plainIPStr):
|
||||
ipList.append(plainIPStr)
|
||||
|
||||
# If we are allowed to resolve -- give it a try if nothing was found
|
||||
if useDns in ("yes", "warn") and not ipList:
|
||||
if plainIP and DNSUtils.isValidIP(plainIP):
|
||||
ipList.append(plainIP)
|
||||
elif useDns in ("yes", "warn"):
|
||||
# If we are allowed to resolve -- give it a try if it is not IP
|
||||
# Try to get IP from possible DNS
|
||||
ip = DNSUtils.dnsToIp(text)
|
||||
ipList.extend(ip)
|
||||
if ip and useDns == "warn":
|
||||
logSys.warning("Determined IP using DNS Reverse Lookup: %s = %s",
|
||||
text, ipList)
|
||||
|
||||
return ipList
|
||||
textToIp = staticmethod(textToIp)
|
||||
|
||||
|
|
|
@ -605,6 +605,45 @@ class GetFailures(unittest.TestCase):
|
|||
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
|
||||
|
||||
class DNSUtilsTests(unittest.TestCase):
|
||||
_ips_hosts = [
|
||||
'1.2.3.4',
|
||||
'1.2.3.255',
|
||||
'255.255.255.255',
|
||||
]
|
||||
_ips_nets = [
|
||||
'1.2.3.0/24',
|
||||
# and now ipv6
|
||||
'::1/128',
|
||||
'2001:0:53ab:63c:418:2bbf:e7c3:123e/32',
|
||||
'fe80::ffff:ffff:ffff/64',
|
||||
]
|
||||
_ips_v6_same = [
|
||||
# the 3 views of the same ip
|
||||
'2001:0000:0234:C1AB:0000:00A0:AABC:003F',
|
||||
'2001:0:234:C1AB:0:A0:AABC:3F',
|
||||
'2001:0:0234:C1ab:0:A0:aabc:3F',
|
||||
]
|
||||
|
||||
def testIsValidIP(self):
|
||||
# just a basic check if works
|
||||
for ip in self._ips_hosts + self._ips_nets + self._ips_v6_same:
|
||||
self.assertTrue(DNSUtils.isValidIP(ip),
|
||||
msg="%r IS a valid IP" % ip)
|
||||
|
||||
for ip in ['1.2.3.256',
|
||||
'::ffffffff',
|
||||
'2001::0234:C1ab::A0:aabc:003F']:
|
||||
self.assertFalse(DNSUtils.isValidIP(ip),
|
||||
msg="%r is not a valid IP" % ip)
|
||||
|
||||
def testSearchIP(self):
|
||||
# just sweep through a set of them and see if returns the same
|
||||
for ip in self._ips_hosts + self._ips_v6_same:
|
||||
self.assertEqual(ip, DNSUtils.searchIP(ip))
|
||||
|
||||
# should be None for networks
|
||||
for ip in self._ips_nets:
|
||||
self.assertEqual(None, DNSUtils.searchIP(ip))
|
||||
|
||||
def testUseDns(self):
|
||||
res = DNSUtils.textToIp('www.example.com', 'no')
|
||||
|
@ -620,13 +659,23 @@ class DNSUtilsTests(unittest.TestCase):
|
|||
'www.example.com',
|
||||
'doh1.2.3.4.buga.xxxxx.yyy.invalid',
|
||||
'1.2.3.4.buga.xxxxx.yyy.invalid',
|
||||
'1.2.3.',
|
||||
# '1.2.3', # socket manages to resolve it to 1.2.0.3
|
||||
]
|
||||
for s in hostnames:
|
||||
res = DNSUtils.textToIp(s, 'yes')
|
||||
if s == 'www.example.com':
|
||||
self.assertEqual(res, ['192.0.43.10'])
|
||||
else:
|
||||
self.assertEqual(res, [])
|
||||
self.assertEqual(res, [],
|
||||
msg="Should have failed to get ip for %r. "
|
||||
"Got %s" % (s, res))
|
||||
|
||||
# make sure that normal IPs are not cut anyhow
|
||||
for useDns in ('yes', 'no'):
|
||||
for s in self._ips_hosts + self._ips_v6_same:
|
||||
res = DNSUtils.textToIp(s, useDns)
|
||||
self.assertEqual([s], res)
|
||||
|
||||
class JailTests(unittest.TestCase):
|
||||
|
||||
|
|
Loading…
Reference in New Issue