mirror of https://github.com/fail2ban/fail2ban
CIDR splitting functionality moved from filter to IPAddr;
meantime commit: code review, simplification, pythonization, etc. + test cases extendedpull/1414/head
parent
1b21f21c22
commit
d65e37e93d
|
@ -343,19 +343,11 @@ class Filter(JailThread):
|
|||
# An empty string is always false
|
||||
if ipstr == "":
|
||||
return
|
||||
s = ipstr.split('/', 1)
|
||||
# IP address without CIDR mask
|
||||
if len(s) == 1:
|
||||
s.insert(1, -1) # <0 means no CIDR
|
||||
elif "." in s[1]: # 255.255.255.0 style mask
|
||||
s[1] = IPAddr.masktoplen(s[1])
|
||||
s[1] = long(s[1])
|
||||
|
||||
# Create IP address object
|
||||
ip = IPAddr(s[0], s[1])
|
||||
ip = IPAddr(ipstr)
|
||||
|
||||
# log and append to ignore list
|
||||
logSys.debug("Add %r to ignore list (%r, %r)", ip, s[0], s[1])
|
||||
logSys.debug("Add %r to ignore list (%r)", ip, ipstr)
|
||||
self.__ignoreIpList.append(ip)
|
||||
|
||||
def delIgnoreIP(self, ip):
|
||||
|
|
|
@ -150,8 +150,20 @@ class IPAddr(object):
|
|||
IPAddr.CACHE_OBJ.set(args, ip)
|
||||
return ip
|
||||
|
||||
# object methods
|
||||
def __init(self, ipstring, cidr=-1):
|
||||
@staticmethod
|
||||
def __wrap_ipstr(ipstr):
|
||||
if "/" not in ipstr:
|
||||
return ipstr, -1
|
||||
s = ipstr.split('/', 1)
|
||||
# IP address without CIDR mask
|
||||
if len(s) > 2:
|
||||
raise ValueError("invalid ipstr %r, too many plen representation" % (ipstr,))
|
||||
if "." in s[1]: # 255.255.255.0 style mask
|
||||
s[1] = IPAddr.masktoplen(s[1])
|
||||
s[1] = long(s[1])
|
||||
return s
|
||||
|
||||
def __init(self, ipstr, cidr=-1):
|
||||
""" initialize IP object by converting IP address string
|
||||
to binary to integer
|
||||
"""
|
||||
|
@ -160,9 +172,13 @@ class IPAddr(object):
|
|||
self._plen = 0
|
||||
self._maskplen = None
|
||||
self._raw = ""
|
||||
|
||||
if cidr == -1:
|
||||
ipstr, cidr = self.__wrap_ipstr(ipstr)
|
||||
|
||||
for family in [socket.AF_INET, socket.AF_INET6]:
|
||||
try:
|
||||
binary = socket.inet_pton(family, ipstring)
|
||||
binary = socket.inet_pton(family, ipstr)
|
||||
self._family = family
|
||||
break
|
||||
except socket.error:
|
||||
|
@ -200,18 +216,13 @@ class IPAddr(object):
|
|||
# string couldn't be converted neither to a IPv4 nor
|
||||
# to a IPv6 address - retain raw input for later use
|
||||
# (e.g. DNS resolution)
|
||||
self._raw = ipstring
|
||||
self._raw = ipstr
|
||||
|
||||
def __repr__(self):
|
||||
if self.isIPv4 and self.plen < 32:
|
||||
return "%s/%d" % (self.ntoa, self.plen)
|
||||
elif self.isIPv6 and self.plen < 128:
|
||||
return "%s/%d" % (self.ntoa, self.plen)
|
||||
else:
|
||||
return self.ntoa
|
||||
return self.ntoa
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
return self.ntoa
|
||||
|
||||
@property
|
||||
def addr(self):
|
||||
|
@ -280,10 +291,10 @@ class IPAddr(object):
|
|||
def hexdump(self):
|
||||
"""Hex representation of the IP address (for debug purposes)
|
||||
"""
|
||||
if self.family == socket.AF_INET:
|
||||
return "%08x" % self.addr
|
||||
elif self.family == socket.AF_INET6:
|
||||
return "%032x" % self.addr
|
||||
if self._family == socket.AF_INET:
|
||||
return "%08x" % self._addr
|
||||
elif self._family == socket.AF_INET6:
|
||||
return "%032x" % self._addr
|
||||
else:
|
||||
return ""
|
||||
|
||||
|
@ -293,18 +304,23 @@ class IPAddr(object):
|
|||
""" represent IP object as text like the deprecated
|
||||
C pendant inet.ntoa but address family independent
|
||||
"""
|
||||
add = ''
|
||||
if self.isIPv4:
|
||||
# convert network to host byte order
|
||||
binary = struct.pack("!L", self._addr)
|
||||
if self._plen and self._plen < 32:
|
||||
add = "/%d" % self._plen
|
||||
elif self.isIPv6:
|
||||
# convert network to host byte order
|
||||
hi = self.addr >> 64
|
||||
lo = self.addr & 0xFFFFFFFFFFFFFFFFL
|
||||
hi = self._addr >> 64
|
||||
lo = self._addr & 0xFFFFFFFFFFFFFFFFL
|
||||
binary = struct.pack("!QQ", hi, lo)
|
||||
if self._plen and self._plen < 128:
|
||||
add = "/%d" % self._plen
|
||||
else:
|
||||
return self._raw
|
||||
|
||||
return socket.inet_ntop(self.family, binary)
|
||||
return socket.inet_ntop(self._family, binary) + add
|
||||
|
||||
def getPTR(self, suffix=""):
|
||||
""" return the DNS PTR string of the provided IP address object
|
||||
|
|
|
@ -1446,6 +1446,17 @@ class DNSUtilsNetworkTests(unittest.TestCase):
|
|||
# compare with string direct:
|
||||
self.assertEqual(d, d2)
|
||||
|
||||
def testIPAddr_CIDR(self):
|
||||
self.assertEqual(str(IPAddr('93.184.0.1', 24)), '93.184.0.0/24')
|
||||
self.assertEqual(str(IPAddr('192.168.1.0/255.255.255.128')), '192.168.1.0/25')
|
||||
self.assertEqual(IPAddr('93.184.0.1', 24).ntoa, '93.184.0.0/24')
|
||||
self.assertEqual(IPAddr('192.168.1.0/255.255.255.128').ntoa, '192.168.1.0/25')
|
||||
|
||||
self.assertEqual(str(IPAddr('2606:2800:220:1:248:1893:25c8::', 120)), '2606:2800:220:1:248:1893:25c8:0/120')
|
||||
self.assertEqual(IPAddr('2606:2800:220:1:248:1893:25c8::', 120).ntoa, '2606:2800:220:1:248:1893:25c8:0/120')
|
||||
self.assertEqual(str(IPAddr('2606:2800:220:1:248:1893:25c8:0/120')), '2606:2800:220:1:248:1893:25c8:0/120')
|
||||
self.assertEqual(IPAddr('2606:2800:220:1:248:1893:25c8:0/120').ntoa, '2606:2800:220:1:248:1893:25c8:0/120')
|
||||
|
||||
def testIPAddr_CompareDNS(self):
|
||||
ips = IPAddr('example.com')
|
||||
self.assertTrue(IPAddr("93.184.216.34").isInNet(ips))
|
||||
|
|
Loading…
Reference in New Issue