New class IPAddr for handling IPv4 and IPv6 addresses

pull/1414/head
Alexander Koeppe 2016-02-28 11:11:24 +01:00 committed by sebres
parent 05f38285f1
commit 01e1383c9b
1 changed files with 239 additions and 4 deletions

View File

@ -433,7 +433,7 @@ class Filter(JailThread):
logSys.debug("Ignore line since time %s < %s - %s",
unixTime, MyTime.time(), self.getFindTime())
break
if self.inIgnoreIPList(ip, log_ignore=True):
if self.inIgnoreIPList(ip.ntoa(), log_ignore=True):
continue
logSys.info(
"[%s] Found %s - %s", self.jail.name, ip, datetime.datetime.fromtimestamp(unixTime).strftime("%Y-%m-%d %H:%M:%S")
@ -530,7 +530,8 @@ class Filter(JailThread):
try:
host = failRegex.getHost()
if returnRawHost:
failList.append([failRegexIndex, host, date,
ipaddr = IPAddr(host)
failList.append([failRegexIndex, ipaddr, date,
failRegex.getMatchedLines()])
if not checkAllRegex:
break
@ -538,8 +539,9 @@ class Filter(JailThread):
ipMatch = DNSUtils.textToIp(host, self.__useDns)
if ipMatch:
for ip in ipMatch:
failList.append([failRegexIndex, ip, date,
failRegex.getMatchedLines()])
ipaddr = IPAddr(ip)
failList.append([failRegexIndex, ipaddr,
date, failRegex.getMatchedLines()])
if not checkAllRegex:
break
except RegexException, e: # pragma: no cover - unsure if reachable
@ -1096,3 +1098,236 @@ class DNSUtils:
""" Convert a binary IPv4 address into string n.n.n.n form.
"""
return socket.inet_ntoa(struct.pack("!L", ipbin))
##
# Class for IP address handling.
#
# This class contains methods for handling IPv4 and IPv6 addresses.
class IPAddr:
""" provide functions to handle IPv4 and IPv6 addresses
"""
IP_CRE = re.compile("^(?:\d{1,3}\.){3}\d{1,3}$")
IP6_CRE = re.compile("^[0-9a-fA-F]{4}[0-9a-fA-F:]+:[0-9a-fA-F]{1,4}|::1$")
# object attributes
addr = 0
family = socket.AF_UNSPEC
plen = 0
valid = False
raw = ""
# object methods
def __init__(self, ipstring, cidr=-1):
""" initialize IP object by converting IP address string
to binary to integer
"""
for family in [socket.AF_INET, socket.AF_INET6]:
try:
binary = socket.inet_pton(family, ipstring)
except socket.error:
continue
else:
self.valid = True
break
if self.valid and family == socket.AF_INET:
# convert host to network byte order
self.addr, = struct.unpack("!L", binary)
self.family = family
self.plen = 32
# mask out host portion if prefix length is supplied
if cidr != None and cidr >= 0:
mask = ~(0xFFFFFFFFL >> cidr)
self.addr = self.addr & mask
self.plen = cidr
elif self.valid and family == socket.AF_INET6:
# convert host to network byte order
hi, lo = struct.unpack("!QQ", binary)
self.addr = (hi << 64) | lo
self.family = family
self.plen = 128
# mask out host portion if prefix length is supplied
if cidr != None and cidr >= 0:
mask = ~(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFL >> cidr)
self.addr = self.addr & mask
self.plen = cidr
# if IPv6 address is a IPv4-compatible, make instance a IPv4
elif self.isInNet(IPAddr("::ffff:0:0", 96)):
self.addr = lo & 0xFFFFFFFFL
self.family = socket.AF_INET
self.plen = 32
else:
# string couldn't be converted neither to a IPv4 nor
# to a IPv6 address - retain raw input for later use
# (e.g. DNS resolution)
self.raw = ipstring
def __repr__(self):
return self.ntoa()
def __str__(self):
return self.ntoa()
def __eq__(self, other):
other = other if isinstance(other, IPAddr) else IPAddr(other)
if not self.valid and not other.valid: return self.raw == other.raw
if not self.valid or not other.valid: return False
if self.addr != other.addr: return False
if self.family != other.family: return False
if self.plen != other.plen: return False
return True
def __ne__(self, other):
other = other if isinstance(other, IPAddr) else IPAddr(other)
if not self.valid and not other.valid: return self.raw != other.raw
if self.addr != other.addr: return True
if self.family != other.family: return True
if self.plen != other.plen: return True
return False
def __lt__(self, other):
other = other if isinstance(other, IPAddr) else IPAddr(other)
return self.family < other.family or self.addr < other.addr
def __add__(self, other):
return "%s%s" % (self, other)
def __radd__(self, other):
return "%s%s" % (other, self)
def __hash__(self):
return hash(self.addr)^hash((self.plen<<16)|self.family)
def hexdump(self):
""" dump the ip address in as a hex sequence in
network byte order - for debug purpose
"""
if self.family == socket.AF_INET:
return "%08x" % self.addr
elif self.family == socket.AF_INET6:
return "%032x" % self.addr
else:
return ""
def ntoa(self):
""" represent IP object as text like the depricated
C pendant inet_ntoa() but address family independent
"""
if self.family == socket.AF_INET:
# convert network to host byte order
binary = struct.pack("!L", self.addr)
elif self.family == socket.AF_INET6:
# convert network to host byte order
hi = self.addr >> 64
lo = self.addr & 0xFFFFFFFFFFFFFFFFL
binary = struct.pack("!QQ", hi, lo)
else:
return self.getRaw()
return socket.inet_ntop(self.family, binary)
def getPTR(self, suffix=""):
""" generates the DNS PTR string of the provided IP address object
if "suffix" is provided it will be appended as the second and top
level reverse domain.
if omitted it is implicitely set to the second and top level reverse
domain of the according IP address family
"""
if self.family == socket.AF_INET:
reversed_ip = ".".join(reversed(self.ntoa().split(".")))
if not suffix:
suffix = "in-addr.arpa."
return "%s.%s" % (reversed_ip, suffix)
elif self.family == socket.AF_INET6:
reversed_ip = ".".join(reversed(self.hexdump()))
if not suffix:
suffix = "ip6.arpa."
return "%s.%s" % (reversed_ip, suffix)
else:
return ""
def isIPv4(self):
""" return true if the IP object is of address family AF_INET
"""
return True if self.family == socket.AF_INET else False
def isIPv6(self):
""" return true if the IP object is of address family AF_INET6
"""
return True if self.family == socket.AF_INET6 else False
def getRaw(self):
""" returns the raw attribute - should only be set
to a non-empty string if prior address conversion
wasn't possible
"""
return self.raw
def isValidIP(self):
""" returns true if the IP object has been created
from a valid IP address or false if not
"""
return self.valid
def isInNet(self, net):
""" returns true if the IP object is in the provided
network (object)
"""
if self.family != net.family:
return False
if self.family == socket.AF_INET:
mask = ~(0xFFFFFFFFL >> net.plen)
elif self.family == socket.AF_INET6:
mask = ~(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFL >> net.plen)
else:
return False
if self.addr & mask == net.addr:
return True
return False
@staticmethod
def masktoplen(maskstr):
""" converts mask string to prefix length
only used for IPv4 masks
"""
mask = IPAddr(maskstr)
plen = 0
while mask.addr:
mask.addr = (mask.addr << 1) & 0xFFFFFFFFL
plen += 1
return plen
@staticmethod
def searchIP(text):
""" Search if an IP address if directly available and return
it.
"""
match = IPAddr.IP_CRE.match(text)
if match:
return match
else:
match = IPAddr.IP6_CRE.match(text)
if match:
return match
else:
return None