fail2ban/fail2ban/server/ipdns.py

348 lines
8.8 KiB
Python

# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
# vi: set ft=python sts=4 ts=4 sw=4 noet :
# This file is part of Fail2Ban.
#
# Fail2Ban is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# Fail2Ban is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Fail2Ban; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__author__ = "Fail2Ban Developers, Alexander Koeppe, Serg G. Brester, Yaroslav Halchenko"
__copyright__ = "Copyright (c) 2004-2016 Fail2ban Developers"
__license__ = "GPL"
import socket
import struct
import re
from .utils import Utils
from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
##
# Helper functions
#
#
def asip(ip):
"""A little helper to guarantee ip being an IPAddr instance"""
return ip if isinstance(ip, IPAddr) or ip is None else IPAddr(ip)
##
# Utils class for DNS handling.
#
# This class contains only static methods used to handle DNS
#
class DNSUtils:
# todo: make configurable the expired time and max count of cache entries:
CACHE_nameToIp = Utils.Cache(maxCount=1000, maxTime=5*60)
CACHE_ipToName = Utils.Cache(maxCount=1000, maxTime=5*60)
@staticmethod
def dnsToIp(dns):
""" Convert a DNS into an IP address using the Python socket module.
Thanks to Kevin Drapel.
"""
# cache, also prevent long wait during retrieving of ip for wrong dns or lazy dns-system:
ips = DNSUtils.CACHE_nameToIp.get(dns)
if ips is not None:
return ips
# retrieve ips
try:
ips = list()
for result in socket.getaddrinfo(dns, None, 0, 0, socket.IPPROTO_TCP):
ip = IPAddr(result[4][0])
if ip.isValid:
ips.append(ip)
except socket.error, e:
# todo: make configurable the expired time of cache entry:
logSys.warning("Unable to find a corresponding IP address for %s: %s", dns, e)
ips = list()
DNSUtils.CACHE_nameToIp.set(dns, ips)
return ips
@staticmethod
def ipToName(ip):
# cache, also prevent long wait during retrieving of name for wrong addresses, lazy dns:
v = DNSUtils.CACHE_ipToName.get(ip, ())
if v != ():
return v
# retrieve name
try:
if not isinstance(ip, IPAddr):
v = socket.gethostbyaddr(ip)[0]
else:
v = socket.gethostbyaddr(ip.ntoa)[0]
except socket.error, e:
logSys.debug("Unable to find a name for the IP %s: %s", ip, e)
v = None
DNSUtils.CACHE_ipToName.set(ip, v)
return v
@staticmethod
def textToIp(text, useDns):
""" Return the IP of DNS found in a given text.
"""
ipList = list()
# Search for plain IP
plainIP = IPAddr.searchIP(text)
if plainIP is not None:
ip = IPAddr(plainIP.group(0))
if ip.isValid:
ipList.append(ip)
# If we are allowed to resolve -- give it a try if nothing was found
if useDns in ("yes", "warn") and not ipList:
# Try to get IP from possible DNS
ip = DNSUtils.dnsToIp(text)
ipList.extend(ip)
if ip and useDns == "warn":
logSys.warning("Determined IP using DNS Lookup: %s = %s",
text, ipList)
return ipList
##
# Class for IP address handling.
#
# This class contains methods for handling IPv4 and IPv6 addresses.
#
class IPAddr:
"""Encapsulate functionality for IPv4 and IPv6 addresses
"""
IP_CRE = re.compile("^(?:\d{1,3}\.){3}\d{1,3}$")
IP6_CRE = re.compile("^[0-9a-fA-F]{4}[0-9a-fA-F:]+:[0-9a-fA-F]{1,4}|::1$")
# object attributes
_addr = 0
_family = socket.AF_UNSPEC
_plen = 0
_isValid = False
_raw = ""
# object methods
def __init__(self, ipstring, cidr=-1):
""" initialize IP object by converting IP address string
to binary to integer
"""
for family in [socket.AF_INET, socket.AF_INET6]:
try:
binary = socket.inet_pton(family, ipstring)
except socket.error:
continue
else:
self._isValid = True
break
if self.isValid and family == socket.AF_INET:
# convert host to network byte order
self._addr, = struct.unpack("!L", binary)
self._family = family
self._plen = 32
# mask out host portion if prefix length is supplied
if cidr is not None and cidr >= 0:
mask = ~(0xFFFFFFFFL >> cidr)
self._addr &= mask
self._plen = cidr
elif self.isValid and family == socket.AF_INET6:
# convert host to network byte order
hi, lo = struct.unpack("!QQ", binary)
self._addr = (hi << 64) | lo
self._family = family
self._plen = 128
# mask out host portion if prefix length is supplied
if cidr is not None and cidr >= 0:
mask = ~(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFL >> cidr)
self._addr &= mask
self._plen = cidr
# if IPv6 address is a IPv4-compatible, make instance a IPv4
elif self.isInNet(_IPv6_v4COMPAT):
self._addr = lo & 0xFFFFFFFFL
self._family = socket.AF_INET
self._plen = 32
else:
# string couldn't be converted neither to a IPv4 nor
# to a IPv6 address - retain raw input for later use
# (e.g. DNS resolution)
self._raw = ipstring
def __repr__(self):
return self.ntoa
def __str__(self):
return self.ntoa
@property
def addr(self):
return self._addr
@property
def family(self):
return self._family
@property
def plen(self):
return self._plen
@property
def raw(self):
"""The raw address
Should only be set to a non-empty string if prior address
conversion wasn't possible
"""
return self._raw
@property
def isValid(self):
"""Either the object corresponds to a valid IP address
"""
return self._isValid
def __eq__(self, other):
if not (self.isValid or other.isValid):
return self.raw == other.raw
return (
(self.isValid and other.isValid) and
(self.addr == other.addr) and
(self.family == other.family) and
(self.plen == other.plen)
)
def __ne__(self, other):
return not (self == other)
def __lt__(self, other):
return self.family < other.family or self.addr < other.addr
def __add__(self, other):
return "%s%s" % (self, other)
def __radd__(self, other):
return "%s%s" % (other, self)
def __hash__(self):
return hash(self.addr) ^ hash((self.plen << 16) | self.family)
@property
def hexdump(self):
"""Hex representation of the IP address (for debug purposes)
"""
if self.family == socket.AF_INET:
return "%08x" % self.addr
elif self.family == socket.AF_INET6:
return "%032x" % self.addr
else:
return ""
# TODO: could be lazily evaluated
@property
def ntoa(self):
""" represent IP object as text like the deprecated
C pendant inet.ntoa but address family independent
"""
if self.isIPv4:
# convert network to host byte order
binary = struct.pack("!L", self._addr)
elif self.isIPv6:
# convert network to host byte order
hi = self.addr >> 64
lo = self.addr & 0xFFFFFFFFFFFFFFFFL
binary = struct.pack("!QQ", hi, lo)
else:
return self.raw
return socket.inet_ntop(self.family, binary)
def getPTR(self, suffix=""):
""" return the DNS PTR string of the provided IP address object
If "suffix" is provided it will be appended as the second and top
level reverse domain.
If omitted it is implicitly set to the second and top level reverse
domain of the according IP address family
"""
if self.isIPv4:
exploded_ip = self.ntoa.split(".")
if not suffix:
suffix = "in-addr.arpa."
elif self.isIPv6:
exploded_ip = self.hexdump()
if not suffix:
suffix = "ip6.arpa."
else:
return ""
return "%s.%s" % (".".join(reversed(exploded_ip)), suffix)
@property
def isIPv4(self):
"""Either the IP object is of address family AF_INET
"""
return self.family == socket.AF_INET
@property
def isIPv6(self):
"""Either the IP object is of address family AF_INET6
"""
return self.family == socket.AF_INET6
def isInNet(self, net):
"""Return either the IP object is in the provided network
"""
if self.family != net.family:
return False
if self.isIPv4:
mask = ~(0xFFFFFFFFL >> net.plen)
elif self.isIPv6:
mask = ~(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFL >> net.plen)
else:
return False
return self.addr & mask == net.addr
@staticmethod
def masktoplen(mask):
"""Convert mask string to prefix length
To be used only for IPv4 masks
"""
mask = mask.addr # to avoid side-effect within original mask
plen = 0
while mask:
mask = (mask << 1) & 0xFFFFFFFFL
plen += 1
return plen
@staticmethod
def searchIP(text):
"""Search if text is an IP address, and return it if so, else None
"""
match = IPAddr.IP_CRE.match(text)
if not match:
match = IPAddr.IP6_CRE.match(text)
return match if match else None
# An IPv4 compatible IPv6 to be reused
_IPv6_v4COMPAT = IPAddr("::ffff:0:0", 96)