RF/ENH: 1st wave of IPAddr pythonization - properties, logical statements, etc

pull/1374/head
Yaroslav Halchenko 2016-04-23 20:05:27 -04:00 committed by Alexander Koeppe
parent 8f035c13ad
commit 42144b05a0
8 changed files with 124 additions and 121 deletions

View File

@ -201,6 +201,7 @@ fail2ban/tests/actionstestcase.py
fail2ban/tests/actiontestcase.py
fail2ban/tests/banmanagertestcase.py
fail2ban/tests/clientreadertestcase.py
fail2ban/tests/clientbeautifiertestcase.py
fail2ban/tests/config/action.d/brokenaction.conf
fail2ban/tests/config/fail2ban.conf
fail2ban/tests/config/filter.d/simple.conf

View File

@ -31,6 +31,7 @@ import re
from functools import wraps
##
# Helper functions / decorator
# Thanks to Yaroslav Halchenko (yarikoptic)
@ -39,6 +40,7 @@ def asip(ip):
"""A little helper to guarantee ip being an IPAddr instance"""
return ip if isinstance(ip, IPAddr) or ip is None else IPAddr(ip)
def iparg(f):
"""A helper decorator to simplify use of asip throughout the code"""
args = inspect.getargspec(f).args
@ -64,18 +66,18 @@ def iparg(f):
# This class contains methods for handling IPv4 and IPv6 addresses.
class IPAddr:
""" provide functions to handle IPv4 and IPv6 addresses
"""Encapsulate functionality for IPv4 and IPv6 addresses
"""
IP_CRE = re.compile("^(?:\d{1,3}\.){3}\d{1,3}$")
IP6_CRE = re.compile("^[0-9a-fA-F]{4}[0-9a-fA-F:]+:[0-9a-fA-F]{1,4}|::1$")
# object attributes
addr = 0
family = socket.AF_UNSPEC
plen = 0
valid = False
raw = ""
_addr = 0
_family = socket.AF_UNSPEC
_plen = 0
_isValid = False
_raw = ""
# object methods
def __init__(self, ipstring, cidr=-1):
@ -88,67 +90,92 @@ class IPAddr:
except socket.error:
continue
else:
self.valid = True
self._isValid = True
break
if self.valid and family == socket.AF_INET:
if self.isValid and family == socket.AF_INET:
# convert host to network byte order
self.addr, = struct.unpack("!L", binary)
self.family = family
self.plen = 32
self._addr, = struct.unpack("!L", binary)
self._family = family
self._plen = 32
# mask out host portion if prefix length is supplied
if cidr != None and cidr >= 0:
if cidr is not None and cidr >= 0:
mask = ~(0xFFFFFFFFL >> cidr)
self.addr = self.addr & mask
self.plen = cidr
self._addr &= mask
self._plen = cidr
elif self.valid and family == socket.AF_INET6:
elif self.isValid and family == socket.AF_INET6:
# convert host to network byte order
hi, lo = struct.unpack("!QQ", binary)
self.addr = (hi << 64) | lo
self.family = family
self.plen = 128
self._addr = (hi << 64) | lo
self._family = family
self._plen = 128
# mask out host portion if prefix length is supplied
if cidr != None and cidr >= 0:
if cidr is not None and cidr >= 0:
mask = ~(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFL >> cidr)
self.addr = self.addr & mask
self.plen = cidr
self._addr &= mask
self._plen = cidr
# if IPv6 address is a IPv4-compatible, make instance a IPv4
elif self.isInNet(IPAddr("::ffff:0:0", 96)):
self.addr = lo & 0xFFFFFFFFL
self.family = socket.AF_INET
self.plen = 32
elif self.isInNet(_IPv6_v4COMPAT):
self._addr = lo & 0xFFFFFFFFL
self._family = socket.AF_INET
self._plen = 32
else:
# string couldn't be converted neither to a IPv4 nor
# to a IPv6 address - retain raw input for later use
# (e.g. DNS resolution)
self.raw = ipstring
self._raw = ipstring
def __repr__(self):
return self.ntoa()
return self.ntoa
def __str__(self):
return self.ntoa()
return self.ntoa
@property
def addr(self):
return self._addr
@property
def family(self):
return self._family
@property
def plen(self):
return self._plen
@property
def raw(self):
"""The raw address
Should only be set to a non-empty string if prior address
conversion wasn't possible
"""
return self._raw
@property
def isValid(self):
"""Either the object corresponds to a valid IP address
"""
return self._isValid
@iparg
def __eq__(self, other):
if not self.valid and not other.valid: return self.raw == other.raw
if not self.valid or not other.valid: return False
if self.addr != other.addr: return False
if self.family != other.family: return False
if self.plen != other.plen: return False
return True
if not (self.isValid or other.isValid):
return self.raw == other.raw
return (
(self.isValid and other.isValid) and
(self.addr == other.addr) and
(self.family == other.family) and
(self.plen == other.plen)
)
@iparg
def __ne__(self, other):
if not self.valid and not other.valid: return self.raw != other.raw
if self.addr != other.addr: return True
if self.family != other.family: return True
if self.plen != other.plen: return True
return False
return not (self == other)
@iparg
def __lt__(self, other):
@ -163,11 +190,11 @@ class IPAddr:
return "%s%s" % (other, self)
def __hash__(self):
return hash(self.addr)^hash((self.plen<<16)|self.family)
return hash(self.addr) ^ hash((self.plen << 16) | self.family)
@property
def hexdump(self):
""" dump the ip address in as a hex sequence in
network byte order - for debug purpose
"""Hex representation of the IP address (for debug purposes)
"""
if self.family == socket.AF_INET:
return "%08x" % self.addr
@ -175,120 +202,96 @@ class IPAddr:
return "%032x" % self.addr
else:
return ""
# TODO: could be lazily evaluated
@property
def ntoa(self):
""" represent IP object as text like the depricated
C pendant inet_ntoa() but address family independent
""" represent IP object as text like the deprecated
C pendant inet.ntoa but address family independent
"""
if self.family == socket.AF_INET:
if self.isIPv4:
# convert network to host byte order
binary = struct.pack("!L", self.addr)
elif self.family == socket.AF_INET6:
binary = struct.pack("!L", self._addr)
elif self.isIPv6:
# convert network to host byte order
hi = self.addr >> 64
lo = self.addr & 0xFFFFFFFFFFFFFFFFL
binary = struct.pack("!QQ", hi, lo)
else:
return self.getRaw()
return self.raw
return socket.inet_ntop(self.family, binary)
def getPTR(self, suffix=""):
""" generates the DNS PTR string of the provided IP address object
if "suffix" is provided it will be appended as the second and top
""" return the DNS PTR string of the provided IP address object
If "suffix" is provided it will be appended as the second and top
level reverse domain.
if omitted it is implicitely set to the second and top level reverse
If omitted it is implicitly set to the second and top level reverse
domain of the according IP address family
"""
if self.family == socket.AF_INET:
reversed_ip = ".".join(reversed(self.ntoa().split(".")))
if self.isIPv4:
exploded_ip = self.ntoa.split(".")
if not suffix:
suffix = "in-addr.arpa."
return "%s.%s" % (reversed_ip, suffix)
elif self.family == socket.AF_INET6:
reversed_ip = ".".join(reversed(self.hexdump()))
elif self.isIPv6:
exploded_ip = self.hexdump()
if not suffix:
suffix = "ip6.arpa."
return "%s.%s" % (reversed_ip, suffix)
suffix = "ip6.arpa."
else:
return ""
return "%s.%s" % (".".join(reversed(exploded_ip)), suffix)
@property
def isIPv4(self):
""" return true if the IP object is of address family AF_INET
"""Either the IP object is of address family AF_INET
"""
return self.family == socket.AF_INET
@property
def isIPv6(self):
""" return true if the IP object is of address family AF_INET6
"""Either the IP object is of address family AF_INET6
"""
return self.family == socket.AF_INET6
def getRaw(self):
""" returns the raw attribute - should only be set
to a non-empty string if prior address conversion
wasn't possible
"""
return self.raw
def isValidIP(self):
""" returns true if the IP object has been created
from a valid IP address or false if not
"""
return self.valid
@iparg
def isInNet(self, net):
""" returns true if the IP object is in the provided
network (object)
"""Return either the IP object is in the provided network
"""
if self.family != net.family:
return False
if self.family == socket.AF_INET:
if self.isIPv4:
mask = ~(0xFFFFFFFFL >> net.plen)
elif self.family == socket.AF_INET6:
elif self.isIPv6:
mask = ~(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFL >> net.plen)
else:
return False
if self.addr & mask == net.addr:
return True
return self.addr & mask == net.addr
return False
@staticmethod
@iparg
def masktoplen(mask):
""" converts mask string to prefix length
only used for IPv4 masks
"""Convert mask string to prefix length
To be used only for IPv4 masks
"""
mask = mask.addr # to avoid side-effect within original mask
plen = 0
while mask.addr:
mask.addr = (mask.addr << 1) & 0xFFFFFFFFL
while mask:
mask = (mask << 1) & 0xFFFFFFFFL
plen += 1
return plen
@staticmethod
def searchIP(text):
""" Search if an IP address if directly available and return
it.
"""Search if text is an IP address, and return it if so, else None
"""
match = IPAddr.IP_CRE.match(text)
if match:
return match
else:
if not match:
match = IPAddr.IP6_CRE.match(text)
if match:
return match
else:
return None
return match if match else None
# An IPv4 compatible IPv6 to be reused
_IPv6_v4COMPAT = IPAddr("::ffff:0:0", 96)

View File

@ -346,7 +346,7 @@ class CommandAction(ActionBase):
the ban.
"""
banaction = self.actionban
if "ip" in aInfo and aInfo["ip"] and aInfo["ip"].isIPv6() and self.actionban6:
if "ip" in aInfo and aInfo["ip"] and aInfo["ip"].isIPv6 and self.actionban6:
banaction = self.actionban6
if not self._processCmd(banaction, aInfo):
raise RuntimeError("Error banning %(ip)s" % aInfo)
@ -386,7 +386,7 @@ class CommandAction(ActionBase):
the ban.
"""
unbanaction = self.actionunban
if "ip" in aInfo and aInfo["ip"] and aInfo["ip"].isIPv6() and self.actionunban6:
if "ip" in aInfo and aInfo["ip"] and aInfo["ip"].isIPv6 and self.actionunban6:
unbanaction = self.actionunban6
if not self._processCmd(unbanaction, aInfo):

View File

@ -152,8 +152,9 @@ class BanManager:
for banData in self.__banList:
ip = banData.getIP()
# Reference: http://www.team-cymru.org/Services/ip-to-asn.html#dns
question = ip.getPTR("origin.asn.cymru.com" if ip.isIPv4()
else "origin6.asn.cymru.com"
question = ip.getPTR(
"origin.asn.cymru.com" if ip.isIPv4
else "origin6.asn.cymru.com"
)
try:
answers = dns.resolver.query(question, "TXT")

View File

@ -414,7 +414,7 @@ class Fail2BanDb(object):
#TODO: Implement data parts once arbitrary match keys completed
cur.execute(
"INSERT INTO bans(jail, ip, timeofban, data) VALUES(?, ?, ?, ?)",
(jail.name, ticket.getIP().ntoa(), int(round(ticket.getTime())),
(jail.name, ticket.getIP().ntoa, int(round(ticket.getTime())),
{"matches": ticket.getMatches(),
"failures": ticket.getAttempt()}))
@ -429,7 +429,7 @@ class Fail2BanDb(object):
ip : str
IP to be removed.
"""
queryArgs = (jail.name, ip.ntoa());
queryArgs = (jail.name, ip.ntoa);
cur.execute(
"DELETE FROM bans WHERE jail = ? AND ip = ?",
queryArgs);
@ -447,7 +447,7 @@ class Fail2BanDb(object):
queryArgs.append(MyTime.time() - bantime)
if ip is not None:
query += " AND ip=?"
queryArgs.append(ip.ntoa())
queryArgs.append(ip.ntoa)
query += " ORDER BY ip, timeofban"
return cur.execute(query, queryArgs)

View File

@ -387,9 +387,9 @@ class Filter(JailThread):
def inIgnoreIPList(self, ip, log_ignore=False):
for net in self.__ignoreIpList:
# if it isn't a valid IP address, try DNS resolution
if not net.isValidIP() and net.getRaw() != "":
if not net.isValid and net.raw != "":
# Check if IP in DNS
ips = DNSUtils.dnsToIp(net.getRaw())
ips = DNSUtils.dnsToIp(net.raw)
if ip in ips:
self.logIgnoreIp(ip, log_ignore, ignore_source="dns")
return True
@ -875,7 +875,7 @@ class DNSUtils:
for result in socket.getaddrinfo(dns, None, 0, 0,
socket.IPPROTO_TCP):
ip = IPAddr(result[4][0])
if ip.isValidIP():
if ip.isValid:
ips.append(ip)
return ips
@ -888,7 +888,7 @@ class DNSUtils:
@iparg
def ipToName(ip):
try:
return socket.gethostbyaddr(ip.ntoa())[0]
return socket.gethostbyaddr(ip.ntoa)[0]
except socket.error, e:
logSys.debug("Unable to find a name for the IP %s: %s" % (ip, e))
return None
@ -900,9 +900,9 @@ class DNSUtils:
ipList = list()
# Search for plain IP
plainIP = IPAddr.searchIP(text)
if not plainIP is None:
if plainIP is not None:
ip = IPAddr(plainIP.group(0))
if ip.isValidIP():
if ip.isValid:
ipList.append(ip)
# If we are allowed to resolve -- give it a try if nothing was found

View File

@ -51,7 +51,7 @@ class Ticket:
def __str__(self):
return "%s: ip=%s time=%s #attempts=%d matches=%r" % \
(self.__class__.__name__.split('.')[-1],
self.__ip.ntoa(), self.__time, self.__attempt,
self.__ip.ntoa, self.__time, self.__attempt,
self.__matches)
def __repr__(self):

View File

@ -107,5 +107,3 @@ class BeautifierTest(unittest.TestCase):
self.assertEqual(self.b.beautify(response), output)