correct <HOST> expression in failregex (now IPv6 will be really found from filter);

respect standard spelling of IPv6 (with port) enclosed in brackets ([ipv6]:port), to prevent the necessarily changing of many filter definitions, comparison:
#IPv4:
127.0.0.1:55555
#IPv6:
[a🅱️c:d::1]:55555
some small fixes (in test cases also) and code review and few optimizations;
pull/1414/head
sebres 2016-05-11 15:27:25 +02:00
parent f47fdf8918
commit 23ad50dbaa
6 changed files with 69 additions and 26 deletions

View File

@ -43,8 +43,8 @@ class Regex:
def __init__(self, regex):
self._matchCache = None
# Perform shortcuts expansions.
# Replace "<HOST>" with default regular expression for host.
regex = regex.replace("<HOST>", "(?:::f{4,6}:)?(?P<host>[\w\-.^_]*\w)")
# Resolve "<HOST>" tag using default regular expression for host:
regex = Regex._resolveHostTag(regex)
# Replace "<SKIPLINES>" with regular expression for multiple lines.
regexSplit = regex.split("<SKIPLINES>")
regex = regexSplit[0]
@ -61,6 +61,20 @@ class Regex:
def __str__(self):
return "%s(%r)" % (self.__class__.__name__, self._regex)
@staticmethod
def _resolveHostTag(regex):
# Replace "<HOST>" with default regular expression for host:
# Other candidates (see gh-1374 for the discussion about):
# differentiate: r"""(?:(?:::f{4,6}:)?(?P<IPv4>(?:\d{1,3}\.){3}\d{1,3})|\[?(?P<IPv6>(?:[0-9a-fA-F]{1,4}::?|::){1,7}(?:[0-9a-fA-F]{1,4}|(?<=:):))\]?|(?P<HOST>[\w\-.^_]*\w))"""
# expected many changes in filter, failregex, etc...
# simple: r"""(?:::f{4,6}:)?(?P<host>[\w\-.^_:]*\w)"""
# not good enough, if not precise expressions around <HOST>, because for example will match '1.2.3.4:23930' as ip-address;
# Todo: move this functionality to filter reader, as default <HOST> replacement,
# make it configurable (via jail/filter configs)
return regex.replace("<HOST>",
r"""(?:::f{4,6}:)?(?P<host>(?:\d{1,3}\.){3}\d{1,3}|\[?(?:[0-9a-fA-F]{1,4}::?|::){1,7}(?:[0-9a-fA-F]{1,4}\]?|(?<=:):)|[\w\-.^_]*\w)""")
##
# Gets the regular expression.
#

View File

@ -103,7 +103,7 @@ class DNSUtils:
# Search for plain IP
plainIP = IPAddr.searchIP(text)
if plainIP is not None:
ip = IPAddr(plainIP.group(0))
ip = IPAddr(plainIP)
if ip.isValid:
ipList.append(ip)
@ -127,9 +127,8 @@ class DNSUtils:
class IPAddr(object):
"""Encapsulate functionality for IPv4 and IPv6 addresses
"""
IP_CRE = re.compile("^(?:\d{1,3}\.){3}\d{1,3}$")
IP6_CRE = re.compile("^[0-9a-fA-F]{4}[0-9a-fA-F:]+:[0-9a-fA-F]{1,4}|::1$")
IP_4_6_CRE = re.compile(
r"""^(?:(?P<IPv4>(?:\d{1,3}\.){3}\d{1,3})|\[?(?P<IPv6>(?:[0-9a-fA-F]{1,4}::?|::){1,7}(?:[0-9a-fA-F]{1,4}|(?<=:):))\]?)$""")
# An IPv4 compatible IPv6 to be reused (see below)
IP6_4COMPAT = None
@ -139,19 +138,33 @@ class IPAddr(object):
# todo: make configurable the expired time and max count of cache entries:
CACHE_OBJ = Utils.Cache(maxCount=1000, maxTime=5*60)
def __new__(cls, ipstring, cidr=-1):
# already correct IPAddr
args = (ipstring, cidr)
def __new__(cls, ipstr, cidr=-1):
# check already cached as IPAddr
args = (ipstr, cidr)
ip = IPAddr.CACHE_OBJ.get(args)
if ip is not None:
return ip
# wrap mask to cidr (correct plen):
if cidr == -1:
ipstr, cidr = IPAddr.__wrap_ipstr(ipstr)
args = (ipstr, cidr)
# check cache again:
if cidr != -1:
ip = IPAddr.CACHE_OBJ.get(args)
if ip is not None:
return ip
ip = super(IPAddr, cls).__new__(cls)
ip.__init(ipstring, cidr)
ip.__init(ipstr, cidr)
IPAddr.CACHE_OBJ.set(args, ip)
return ip
@staticmethod
def __wrap_ipstr(ipstr):
# because of standard spelling of IPv6 (with port) enclosed in brackets ([ipv6]:port),
# remove they now (be sure the <HOST> inside failregex uses this for IPv6 (has \[?...\]?)
if len(ipstr) > 2 and ipstr[0] == '[' and ipstr[-1] == ']':
ipstr = ipstr[1:-1]
# test mask:
if "/" not in ipstr:
return ipstr, -1
s = ipstr.split('/', 1)
@ -173,9 +186,6 @@ class IPAddr(object):
self._maskplen = None
self._raw = ""
if cidr == -1:
ipstr, cidr = self.__wrap_ipstr(ipstr)
for family in [socket.AF_INET, socket.AF_INET6]:
try:
binary = socket.inet_pton(family, ipstr)
@ -376,17 +386,17 @@ class IPAddr(object):
@property
def maskplen(self):
plen = 0
mplen = 0
if self._maskplen is not None:
return self._plen
maddr = self.addr
return self._maskplen
maddr = self._addr
while maddr:
if not (maddr & 0x80000000):
raise ValueError("invalid mask %r, no plen representation" % (str(self),))
maddr = (maddr << 1) & 0xFFFFFFFFL
plen += 1
self._maskplen = plen
return plen
mplen += 1
self._maskplen = mplen
return mplen
@staticmethod
def masktoplen(mask):
@ -400,10 +410,13 @@ class IPAddr(object):
def searchIP(text):
"""Search if text is an IP address, and return it if so, else None
"""
match = IPAddr.IP_CRE.match(text)
match = IPAddr.IP_4_6_CRE.match(text)
if not match:
match = IPAddr.IP6_CRE.match(text)
return match if match else None
return None
ipstr = match.group('IPv4')
if ipstr != '':
return ipstr
return match.group('IPv6')
# An IPv4 compatible IPv6 to be reused

View File

@ -8,6 +8,8 @@
# failJSON: { "time": "2013-07-11T01:21:43", "match": true , "host": "194.228.20.113" }
[Thu Jul 11 01:21:43 2013] [error] [client 194.228.20.113] user dsfasdf not found: /
# failJSON: { "time": "2013-07-11T01:21:44", "match": true , "host": "2606:2800:220:1:248:1893:25c8:1946" }
[Thu Jul 11 01:21:44 2013] [error] [client 2606:2800:220:1:248:1893:25c8:1946] user test-ipv6 not found: /
# The failures below use the configuration described in fail2ban/tests/files/config/apache-auth
#
@ -56,6 +58,8 @@
# failJSON: { "time": "2013-07-20T22:11:43", "match": true , "host": "127.0.0.1" }
[Sat Jul 20 22:11:43.147674 2013] [authz_owner:error] [pid 17540:tid 140122922129152] [client 127.0.0.1:51548] AH01637: Authorization of user username to access /basic/authz_owner/cant_get_me.html failed, reason: file owner dan does not match
# failJSON: { "time": "2013-07-20T22:11:44", "match": true , "host": "2606:2800:220:1:248:1893:25c8:1946" }
[Sat Jul 20 22:11:44.147674 2013] [authz_owner:error] [pid 17540:tid 140122922129152] [client [2606:2800:220:1:248:1893:25c8:1946]:51548] AH01637: Authorization of user test-ipv6 to access /basic/authz_owner/cant_get_me.html failed, reason: file owner dan does not match
# wget --http-user=username --http-password=password http://localhost/basic/authz_owner/cant_get_me.html -O /dev/null
# failJSON: { "time": "2013-07-20T21:42:44", "match": true , "host": "127.0.0.1" }

View File

@ -3,6 +3,8 @@
Jun 21 16:47:48 digital-mlhhyiqscv sshd[13709]: error: PAM: Authentication failure for myhlj1374 from 192.030.0.6
# failJSON: { "time": "2005-05-29T20:56:52", "match": true , "host": "example.com" }
May 29 20:56:52 imago sshd[28732]: error: PAM: Authentication failure for stefanor from example.com
# failJSON: { "time": "2005-05-29T20:56:56", "match": true , "host": "2606:2800:220:1:248:1893:25c8:1946" }
May 29 20:56:56 imago sshd[28732]: error: PAM: Authentication failure for test-ipv6 from 2606:2800:220:1:248:1893:25c8:1946
#2
# failJSON: { "time": "2005-02-25T14:34:10", "match": true , "host": "194.117.26.69" }

View File

@ -458,9 +458,9 @@ class Transmitter(TransmitterBase):
"failed attempt from <HOST> again",
],
[
"user john at (?:::f{4,6}:)?(?P<host>[\w\-.^_]*\\w)",
"Admin user login from (?:::f{4,6}:)?(?P<host>[\w\-.^_]*\\w)",
"failed attempt from (?:::f{4,6}:)?(?P<host>[\w\-.^_]*\\w) again",
"user john at %s" % (Regex._resolveHostTag('<HOST>')),
"Admin user login from %s" % (Regex._resolveHostTag('<HOST>')),
"failed attempt from %s again" % (Regex._resolveHostTag('<HOST>')),
],
self.jailName
)
@ -483,7 +483,7 @@ class Transmitter(TransmitterBase):
],
[
"user john",
"Admin user login from (?:::f{4,6}:)?(?P<host>[\w\-.^_]*\\w)",
"Admin user login from %s" % (Regex._resolveHostTag('<HOST>')),
"Dont match me!",
],
self.jailName

View File

@ -101,6 +101,16 @@ def initTests(opts):
c.set('192.0.2.%s' % i, None)
c.set('198.51.100.%s' % i, None)
c.set('203.0.113.%s' % i, None)
if unittest.F2B.no_network: # pragma: no cover
# precache all wrong dns to ip's used in test cases:
c = DNSUtils.CACHE_nameToIp
for i in (
('999.999.999.999', []),
('abcdef.abcdef', []),
('192.168.0.', []),
('failed.dns.ch', []),
):
c.set(*i)
def mtimesleep():