diff --git a/fail2ban/server/filter.py b/fail2ban/server/filter.py index 4bbf7996..59d2e0d7 100644 --- a/fail2ban/server/filter.py +++ b/fail2ban/server/filter.py @@ -343,19 +343,11 @@ class Filter(JailThread): # An empty string is always false if ipstr == "": return - s = ipstr.split('/', 1) - # IP address without CIDR mask - if len(s) == 1: - s.insert(1, -1) # <0 means no CIDR - elif "." in s[1]: # 255.255.255.0 style mask - s[1] = IPAddr.masktoplen(s[1]) - s[1] = long(s[1]) - # Create IP address object - ip = IPAddr(s[0], s[1]) + ip = IPAddr(ipstr) # log and append to ignore list - logSys.debug("Add %r to ignore list (%r, %r)", ip, s[0], s[1]) + logSys.debug("Add %r to ignore list (%r)", ip, ipstr) self.__ignoreIpList.append(ip) def delIgnoreIP(self, ip): diff --git a/fail2ban/server/ipdns.py b/fail2ban/server/ipdns.py index cd22172a..8021b1d8 100644 --- a/fail2ban/server/ipdns.py +++ b/fail2ban/server/ipdns.py @@ -150,8 +150,20 @@ class IPAddr(object): IPAddr.CACHE_OBJ.set(args, ip) return ip - # object methods - def __init(self, ipstring, cidr=-1): + @staticmethod + def __wrap_ipstr(ipstr): + if "/" not in ipstr: + return ipstr, -1 + s = ipstr.split('/', 1) + # IP address without CIDR mask + if len(s) > 2: + raise ValueError("invalid ipstr %r, too many plen representation" % (ipstr,)) + if "." in s[1]: # 255.255.255.0 style mask + s[1] = IPAddr.masktoplen(s[1]) + s[1] = long(s[1]) + return s + + def __init(self, ipstr, cidr=-1): """ initialize IP object by converting IP address string to binary to integer """ @@ -160,9 +172,13 @@ class IPAddr(object): self._plen = 0 self._maskplen = None self._raw = "" + + if cidr == -1: + ipstr, cidr = self.__wrap_ipstr(ipstr) + for family in [socket.AF_INET, socket.AF_INET6]: try: - binary = socket.inet_pton(family, ipstring) + binary = socket.inet_pton(family, ipstr) self._family = family break except socket.error: @@ -200,18 +216,13 @@ class IPAddr(object): # string couldn't be converted neither to a IPv4 nor # to a IPv6 address - retain raw input for later use # (e.g. DNS resolution) - self._raw = ipstring + self._raw = ipstr def __repr__(self): - if self.isIPv4 and self.plen < 32: - return "%s/%d" % (self.ntoa, self.plen) - elif self.isIPv6 and self.plen < 128: - return "%s/%d" % (self.ntoa, self.plen) - else: - return self.ntoa + return self.ntoa def __str__(self): - return self.__repr__() + return self.ntoa @property def addr(self): @@ -280,10 +291,10 @@ class IPAddr(object): def hexdump(self): """Hex representation of the IP address (for debug purposes) """ - if self.family == socket.AF_INET: - return "%08x" % self.addr - elif self.family == socket.AF_INET6: - return "%032x" % self.addr + if self._family == socket.AF_INET: + return "%08x" % self._addr + elif self._family == socket.AF_INET6: + return "%032x" % self._addr else: return "" @@ -293,18 +304,23 @@ class IPAddr(object): """ represent IP object as text like the deprecated C pendant inet.ntoa but address family independent """ + add = '' if self.isIPv4: # convert network to host byte order binary = struct.pack("!L", self._addr) + if self._plen and self._plen < 32: + add = "/%d" % self._plen elif self.isIPv6: # convert network to host byte order - hi = self.addr >> 64 - lo = self.addr & 0xFFFFFFFFFFFFFFFFL + hi = self._addr >> 64 + lo = self._addr & 0xFFFFFFFFFFFFFFFFL binary = struct.pack("!QQ", hi, lo) + if self._plen and self._plen < 128: + add = "/%d" % self._plen else: return self._raw - - return socket.inet_ntop(self.family, binary) + + return socket.inet_ntop(self._family, binary) + add def getPTR(self, suffix=""): """ return the DNS PTR string of the provided IP address object diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index 44e06067..2d82ab6f 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -1446,6 +1446,17 @@ class DNSUtilsNetworkTests(unittest.TestCase): # compare with string direct: self.assertEqual(d, d2) + def testIPAddr_CIDR(self): + self.assertEqual(str(IPAddr('93.184.0.1', 24)), '93.184.0.0/24') + self.assertEqual(str(IPAddr('192.168.1.0/255.255.255.128')), '192.168.1.0/25') + self.assertEqual(IPAddr('93.184.0.1', 24).ntoa, '93.184.0.0/24') + self.assertEqual(IPAddr('192.168.1.0/255.255.255.128').ntoa, '192.168.1.0/25') + + self.assertEqual(str(IPAddr('2606:2800:220:1:248:1893:25c8::', 120)), '2606:2800:220:1:248:1893:25c8:0/120') + self.assertEqual(IPAddr('2606:2800:220:1:248:1893:25c8::', 120).ntoa, '2606:2800:220:1:248:1893:25c8:0/120') + self.assertEqual(str(IPAddr('2606:2800:220:1:248:1893:25c8:0/120')), '2606:2800:220:1:248:1893:25c8:0/120') + self.assertEqual(IPAddr('2606:2800:220:1:248:1893:25c8:0/120').ntoa, '2606:2800:220:1:248:1893:25c8:0/120') + def testIPAddr_CompareDNS(self): ips = IPAddr('example.com') self.assertTrue(IPAddr("93.184.216.34").isInNet(ips))