2nd wave: code review, simplification, pythonization, etc. (test cases passed)

pull/1414/head
sebres 2016-05-10 10:08:20 +02:00
parent c1a54974e9
commit 9b06c325e1
2 changed files with 77 additions and 35 deletions

View File

@ -38,7 +38,9 @@ logSys = getLogger(__name__)
# #
def asip(ip): def asip(ip):
"""A little helper to guarantee ip being an IPAddr instance""" """A little helper to guarantee ip being an IPAddr instance"""
return ip if isinstance(ip, IPAddr) or ip is None else IPAddr(ip) if isinstance(ip, IPAddr):
return ip
return IPAddr(ip)
## ##
@ -122,38 +124,53 @@ class DNSUtils:
# #
# This class contains methods for handling IPv4 and IPv6 addresses. # This class contains methods for handling IPv4 and IPv6 addresses.
# #
class IPAddr: class IPAddr(object):
"""Encapsulate functionality for IPv4 and IPv6 addresses """Encapsulate functionality for IPv4 and IPv6 addresses
""" """
IP_CRE = re.compile("^(?:\d{1,3}\.){3}\d{1,3}$") IP_CRE = re.compile("^(?:\d{1,3}\.){3}\d{1,3}$")
IP6_CRE = re.compile("^[0-9a-fA-F]{4}[0-9a-fA-F:]+:[0-9a-fA-F]{1,4}|::1$") IP6_CRE = re.compile("^[0-9a-fA-F]{4}[0-9a-fA-F:]+:[0-9a-fA-F]{1,4}|::1$")
# An IPv4 compatible IPv6 to be reused (see below)
IP6_4COMPAT = None
# object attributes # object attributes
_addr = 0 __slots__ = '_family','_addr','_plen','_maskplen','_raw'
_family = socket.AF_UNSPEC
_plen = 0 # todo: make configurable the expired time and max count of cache entries:
_isValid = False CACHE_OBJ = Utils.Cache(maxCount=1000, maxTime=5*60)
_raw = ""
def __new__(cls, ipstring, cidr=-1):
# already correct IPAddr
args = (ipstring, cidr)
ip = IPAddr.CACHE_OBJ.get(args)
if ip is not None:
return ip
ip = super(IPAddr, cls).__new__(cls)
ip.__init(ipstring, cidr)
IPAddr.CACHE_OBJ.set(args, ip)
return ip
# object methods # object methods
def __init__(self, ipstring, cidr=-1): def __init(self, ipstring, cidr=-1):
""" initialize IP object by converting IP address string """ initialize IP object by converting IP address string
to binary to integer to binary to integer
""" """
self._family = socket.AF_UNSPEC
self._addr = 0
self._plen = 0
self._maskplen = None
self._raw = ""
for family in [socket.AF_INET, socket.AF_INET6]: for family in [socket.AF_INET, socket.AF_INET6]:
try: try:
binary = socket.inet_pton(family, ipstring) binary = socket.inet_pton(family, ipstring)
self._family = family
break
except socket.error: except socket.error:
continue continue
else:
self._isValid = True
break
if self.isValid and family == socket.AF_INET: if self._family == socket.AF_INET:
# convert host to network byte order # convert host to network byte order
self._addr, = struct.unpack("!L", binary) self._addr, = struct.unpack("!L", binary)
self._family = family
self._plen = 32 self._plen = 32
# mask out host portion if prefix length is supplied # mask out host portion if prefix length is supplied
@ -162,11 +179,10 @@ class IPAddr:
self._addr &= mask self._addr &= mask
self._plen = cidr self._plen = cidr
elif self.isValid and family == socket.AF_INET6: elif self._family == socket.AF_INET6:
# convert host to network byte order # convert host to network byte order
hi, lo = struct.unpack("!QQ", binary) hi, lo = struct.unpack("!QQ", binary)
self._addr = (hi << 64) | lo self._addr = (hi << 64) | lo
self._family = family
self._plen = 128 self._plen = 128
# mask out host portion if prefix length is supplied # mask out host portion if prefix length is supplied
@ -176,7 +192,7 @@ class IPAddr:
self._plen = cidr self._plen = cidr
# if IPv6 address is a IPv4-compatible, make instance a IPv4 # if IPv6 address is a IPv4-compatible, make instance a IPv4
elif self.isInNet(_IPv6_v4COMPAT): elif self.isInNet(IPAddr.IP6_4COMPAT):
self._addr = lo & 0xFFFFFFFFL self._addr = lo & 0xFFFFFFFFL
self._family = socket.AF_INET self._family = socket.AF_INET
self._plen = 32 self._plen = 32
@ -217,32 +233,43 @@ class IPAddr:
def isValid(self): def isValid(self):
"""Either the object corresponds to a valid IP address """Either the object corresponds to a valid IP address
""" """
return self._isValid return self._family != socket.AF_UNSPEC
def __eq__(self, other): def __eq__(self, other):
if not (self.isValid or other.isValid): if not isinstance(other, IPAddr):
return self.raw == other.raw if other is None: return False
other = IPAddr(other)
if self._family != other._family: return False
if self._family == socket.AF_UNSPEC:
return self._raw == other._raw
return ( return (
(self.isValid and other.isValid) and (self._addr == other._addr) and
(self.addr == other.addr) and (self._plen == other._plen)
(self.family == other.family) and
(self.plen == other.plen)
) )
def __ne__(self, other): def __ne__(self, other):
return not (self == other) return not (self == other)
def __lt__(self, other): def __lt__(self, other):
return self.family < other.family or self.addr < other.addr if not isinstance(other, IPAddr):
if other is None: return False
other = IPAddr(other)
return self._family < other._family or self._addr < other._addr
def __add__(self, other): def __add__(self, other):
if not isinstance(other, IPAddr):
other = IPAddr(other)
return "%s%s" % (self, other) return "%s%s" % (self, other)
def __radd__(self, other): def __radd__(self, other):
if not isinstance(other, IPAddr):
other = IPAddr(other)
return "%s%s" % (other, self) return "%s%s" % (other, self)
def __hash__(self): def __hash__(self):
return hash(self.addr) ^ hash((self.plen << 16) | self.family) # should be the same as by string (because of possible compare with string):
return hash(self.ntoa)
#return hash(self._addr)^hash((self._plen<<16)|self._family)
@property @property
def hexdump(self): def hexdump(self):
@ -270,7 +297,7 @@ class IPAddr:
lo = self.addr & 0xFFFFFFFFFFFFFFFFL lo = self.addr & 0xFFFFFFFFFFFFFFFFL
binary = struct.pack("!QQ", hi, lo) binary = struct.pack("!QQ", hi, lo)
else: else:
return self.raw return self._raw
return socket.inet_ntop(self.family, binary) return socket.inet_ntop(self.family, binary)
@ -310,6 +337,11 @@ class IPAddr:
def isInNet(self, net): def isInNet(self, net):
"""Return either the IP object is in the provided network """Return either the IP object is in the provided network
""" """
# if it isn't a valid IP address, try DNS resolution
if not net.isValid and net.raw != "":
# Check if IP in DNS
return self in DNSUtils.dnsToIp(net.raw)
if self.family != net.family: if self.family != net.family:
return False return False
if self.isIPv4: if self.isIPv4:
@ -319,7 +351,21 @@ class IPAddr:
else: else:
return False return False
return self.addr & mask == net.addr return (self.addr & mask) == net.addr
@property
def maskplen(self):
plen = 0
if self._maskplen is not None:
return self._plen
maddr = self.addr
while maddr:
if not (maddr & 0x80000000):
raise ValueError("invalid mask %r, no plen representation" % (str(self),))
maddr = (maddr << 1) & 0xFFFFFFFFL
plen += 1
self._maskplen = plen
return plen
@staticmethod @staticmethod
def masktoplen(mask): def masktoplen(mask):
@ -327,12 +373,7 @@ class IPAddr:
To be used only for IPv4 masks To be used only for IPv4 masks
""" """
mask = mask.addr # to avoid side-effect within original mask return IPAddr(mask).maskplen
plen = 0
while mask:
mask = (mask << 1) & 0xFFFFFFFFL
plen += 1
return plen
@staticmethod @staticmethod
def searchIP(text): def searchIP(text):
@ -343,5 +384,6 @@ class IPAddr:
match = IPAddr.IP6_CRE.match(text) match = IPAddr.IP6_CRE.match(text)
return match if match else None return match if match else None
# An IPv4 compatible IPv6 to be reused # An IPv4 compatible IPv6 to be reused
_IPv6_v4COMPAT = IPAddr("::ffff:0:0", 96) IPAddr.IP6_4COMPAT = IPAddr("::ffff:0:0", 96)

View File

@ -25,7 +25,7 @@ import unittest
from ..client.beautifier import Beautifier from ..client.beautifier import Beautifier
from ..version import version from ..version import version
from ..ipaddr import IPAddr from ..server.ipdns import IPAddr
class BeautifierTest(unittest.TestCase): class BeautifierTest(unittest.TestCase):