mirror of https://github.com/fail2ban/fail2ban
separated host match group and tags for ip4, ip6, dns, fid (failure-id):
- better recognition for usage of textToIp, expected or raw value should be used; - separated failure id vs. host (if found use `fid` instead of `host` resp. `ip`); - additional optional groups may be used in tags replacement by executing actions;pull/1459/head
parent
39366e703a
commit
d344274271
|
@ -78,9 +78,9 @@ class FailManager:
|
|||
def addFailure(self, ticket, count=1):
|
||||
attempts = 1
|
||||
with self.__lock:
|
||||
ip = ticket.getIP()
|
||||
fid = ticket.getID()
|
||||
try:
|
||||
fData = self.__failList[ip]
|
||||
fData = self.__failList[fid]
|
||||
# if the same object - the same matches but +1 attempt:
|
||||
if fData is ticket:
|
||||
matches = None
|
||||
|
@ -109,7 +109,7 @@ class FailManager:
|
|||
fData = FailTicket(ticket=ticket)
|
||||
if count > ticket.getAttempt():
|
||||
fData.setRetry(count)
|
||||
self.__failList[ip] = fData
|
||||
self.__failList[fid] = fData
|
||||
|
||||
attempts = fData.getRetry()
|
||||
self.__failTotal += 1
|
||||
|
@ -132,7 +132,7 @@ class FailManager:
|
|||
|
||||
def cleanup(self, time):
|
||||
with self.__lock:
|
||||
todelete = [ip for ip,item in self.__failList.iteritems() \
|
||||
todelete = [fid for fid,item in self.__failList.iteritems() \
|
||||
if item.getLastTime() + self.__maxTime <= time]
|
||||
if len(todelete) == len(self.__failList):
|
||||
# remove all:
|
||||
|
@ -142,27 +142,27 @@ class FailManager:
|
|||
return
|
||||
if len(todelete) / 2.0 <= len(self.__failList) / 3.0:
|
||||
# few as 2/3 should be removed - remove particular items:
|
||||
for ip in todelete:
|
||||
del self.__failList[ip]
|
||||
for fid in todelete:
|
||||
del self.__failList[fid]
|
||||
else:
|
||||
# create new dictionary without items to be deleted:
|
||||
self.__failList = dict((ip,item) for ip,item in self.__failList.iteritems() \
|
||||
self.__failList = dict((fid,item) for fid,item in self.__failList.iteritems() \
|
||||
if item.getLastTime() + self.__maxTime > time)
|
||||
self.__bgSvc.service()
|
||||
|
||||
def delFailure(self, ip):
|
||||
def delFailure(self, fid):
|
||||
with self.__lock:
|
||||
try:
|
||||
del self.__failList[ip]
|
||||
del self.__failList[fid]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
def toBan(self, ip=None):
|
||||
def toBan(self, fid=None):
|
||||
with self.__lock:
|
||||
for ip in ([ip] if ip != None and ip in self.__failList else self.__failList):
|
||||
data = self.__failList[ip]
|
||||
for fid in ([fid] if fid != None and fid in self.__failList else self.__failList):
|
||||
data = self.__failList[fid]
|
||||
if data.getRetry() >= self.__maxRetry:
|
||||
del self.__failList[ip]
|
||||
del self.__failList[fid]
|
||||
return data
|
||||
self.__bgSvc.service()
|
||||
raise FailManagerEmpty
|
||||
|
|
|
@ -62,18 +62,39 @@ class Regex:
|
|||
def __str__(self):
|
||||
return "%s(%r)" % (self.__class__.__name__, self._regex)
|
||||
|
||||
##
|
||||
# Replaces "<HOST>", "<IP4>", "<IP6>", "<FID>" with default regular expression for host
|
||||
#
|
||||
# (see gh-1374 for the discussion about other candidates)
|
||||
# @return the replaced regular expression as string
|
||||
|
||||
@staticmethod
|
||||
def _resolveHostTag(regex):
|
||||
# Replace "<HOST>" with default regular expression for host:
|
||||
# Other candidates (see gh-1374 for the discussion about):
|
||||
# differentiate: r"""(?:(?:::f{4,6}:)?(?P<IPv4>(?:\d{1,3}\.){3}\d{1,3})|\[?(?P<IPv6>(?:[0-9a-fA-F]{1,4}::?|::){1,7}(?:[0-9a-fA-F]{1,4}|(?<=:):))\]?|(?P<HOST>[\w\-.^_]*\w))"""
|
||||
# expected many changes in filter, failregex, etc...
|
||||
# simple: r"""(?:::f{4,6}:)?(?P<host>[\w\-.^_:]*\w)"""
|
||||
# not good enough, if not precise expressions around <HOST>, because for example will match '1.2.3.4:23930' as ip-address;
|
||||
# Todo: move this functionality to filter reader, as default <HOST> replacement,
|
||||
# make it configurable (via jail/filter configs)
|
||||
return regex.replace("<HOST>",
|
||||
r"""(?:::f{4,6}:)?(?P<host>(?:\d{1,3}\.){3}\d{1,3}|\[?(?:[0-9a-fA-F]{1,4}::?|::){1,7}(?:[0-9a-fA-F]{1,4}\]?|(?<=:):)|[\w\-.^_]*\w)""")
|
||||
# 3 groups instead of <HOST> - separated ipv4, ipv6 and host
|
||||
regex = regex.replace("<HOST>",
|
||||
r"""(?:(?:::f{4,6}:)?(?P<ip4>(?:\d{1,3}\.){3}\d{1,3})|\[?(?P<ip6>(?:[0-9a-fA-F]{1,4}::?|::){1,7}(?:[0-9a-fA-F]{1,4}|(?<=:):))\]?|(?P<dns>[\w\-.^_]*\w))""")
|
||||
# separated ipv4:
|
||||
r = r"""(?:::f{4,6}:)?(?P<ip4>(?:\d{1,3}\.){3}\d{1,3})"""
|
||||
regex = regex.replace("<IP4>", r); # self closed
|
||||
regex = regex.replace("<F-IP4/>", r); # closed
|
||||
# separated ipv6:
|
||||
r = r"""(?P<ip6>(?:[0-9a-fA-F]{1,4}::?|::){1,7}(?:[0-9a-fA-F]{1,4}?|(?<=:):))"""
|
||||
regex = regex.replace("<IP6>", r); # self closed
|
||||
regex = regex.replace("<F-IP6/>", r); # closed
|
||||
# separated dns:
|
||||
r = r"""(?P<dns>[\w\-.^_]*\w)"""
|
||||
regex = regex.replace("<DNS>", r); # self closed
|
||||
regex = regex.replace("<F-DNS/>", r); # closed
|
||||
# default failure-id as no space tag:
|
||||
regex = regex.replace("<F-ID/>", r"""(?P<fid>\S+)"""); # closed
|
||||
# default failure port, like 80 or http :
|
||||
regex = regex.replace("<F-PORT/>", r"""(?P<port>\w+)"""); # closed
|
||||
# default failure groups (begin / end tag) for customizable expressions:
|
||||
for o,r in (('IP4', 'ip4'), ('IP6', 'ip6'), ('DNS', 'dns'), ('ID', 'fid'), ('PORT', 'fport')):
|
||||
regex = regex.replace("<F-%s>" % o, "(?P<%s>" % r); # open tag
|
||||
regex = regex.replace("</F-%s>" % o, ")"); # close tag
|
||||
|
||||
return regex
|
||||
|
||||
##
|
||||
# Gets the regular expression.
|
||||
|
@ -207,6 +228,13 @@ class RegexException(Exception):
|
|||
pass
|
||||
|
||||
|
||||
##
|
||||
# Groups used as failure identifier.
|
||||
#
|
||||
# The order of this tuple is important while searching for failure-id
|
||||
#
|
||||
FAILURE_ID_GROPS = ("fid", "ip4", "ip6", "dns")
|
||||
|
||||
##
|
||||
# Regular expression class.
|
||||
#
|
||||
|
@ -224,21 +252,44 @@ class FailRegex(Regex):
|
|||
def __init__(self, regex):
|
||||
# Initializes the parent.
|
||||
Regex.__init__(self, regex)
|
||||
# Check for group "host"
|
||||
if "host" not in self._regexObj.groupindex:
|
||||
raise RegexException("No 'host' group in '%s'" % self._regex)
|
||||
# Check for group "dns", "ip4", "ip6", "fid"
|
||||
if not [grp for grp in FAILURE_ID_GROPS if grp in self._regexObj.groupindex]:
|
||||
raise RegexException("No failure-id group in '%s'" % self._regex)
|
||||
|
||||
##
|
||||
# Returns all matched groups.
|
||||
#
|
||||
|
||||
def getGroups(self):
|
||||
return self._matchCache.groupdict()
|
||||
|
||||
##
|
||||
# Returns the matched failure id.
|
||||
#
|
||||
# This corresponds to the pattern matched by the named group from given groups.
|
||||
# @return the matched failure-id
|
||||
|
||||
def getFailID(self, groups=FAILURE_ID_GROPS):
|
||||
fid = None
|
||||
for grp in groups:
|
||||
try:
|
||||
fid = self._matchCache.group(grp)
|
||||
except IndexError:
|
||||
continue
|
||||
if fid is not None:
|
||||
break
|
||||
if fid is None:
|
||||
# Gets a few information.
|
||||
s = self._matchCache.string
|
||||
r = self._matchCache.re
|
||||
raise RegexException("No group found in '%s' using '%s'" % (s, r))
|
||||
return str(fid)
|
||||
|
||||
##
|
||||
# Returns the matched host.
|
||||
#
|
||||
# This corresponds to the pattern matched by the named group "host".
|
||||
# This corresponds to the pattern matched by the named group "ip4", "ip6" or "dns".
|
||||
# @return the matched host
|
||||
|
||||
def getHost(self):
|
||||
host = self._matchCache.group("host")
|
||||
if host is None:
|
||||
# Gets a few information.
|
||||
s = self._matchCache.string
|
||||
r = self._matchCache.re
|
||||
raise RegexException("No 'host' found in '%s' using '%s'" % (s, r))
|
||||
return str(host)
|
||||
return self.getFailID(("ip4", "ip6", "dns"))
|
||||
|
|
|
@ -418,6 +418,9 @@ class Filter(JailThread):
|
|||
ip = element[1]
|
||||
unixTime = element[2]
|
||||
lines = element[3]
|
||||
fail = {}
|
||||
if len(element) > 4:
|
||||
fail = element[4]
|
||||
logSys.debug("Processing line with time:%s and ip:%s",
|
||||
unixTime, ip)
|
||||
if unixTime < MyTime.time() - self.getFindTime():
|
||||
|
@ -429,7 +432,7 @@ class Filter(JailThread):
|
|||
logSys.info(
|
||||
"[%s] Found %s - %s", self.jail.name, ip, datetime.datetime.fromtimestamp(unixTime).strftime("%Y-%m-%d %H:%M:%S")
|
||||
)
|
||||
tick = FailTicket(ip, unixTime, lines)
|
||||
tick = FailTicket(ip, unixTime, lines, data=fail)
|
||||
self.failManager.addFailure(tick)
|
||||
|
||||
##
|
||||
|
@ -457,7 +460,12 @@ class Filter(JailThread):
|
|||
checkAllRegex=False):
|
||||
failList = list()
|
||||
|
||||
# Checks if we must ignore this line.
|
||||
cidr = IPAddr.CIDR_UNSPEC
|
||||
if self.__useDns == "raw":
|
||||
returnRawHost = True
|
||||
cidr = IPAddr.CIDR_RAW
|
||||
|
||||
# Checks if we mut ignore this line.
|
||||
if self.ignoreLine([tupleLine[::2]]) is not None:
|
||||
# The ignoreregex matched. Return.
|
||||
logSys.log(7, "Matched ignoreregex and was \"%s\" ignored",
|
||||
|
@ -518,19 +526,45 @@ class Filter(JailThread):
|
|||
% ("\n".join(failRegex.getMatchedLines()), timeText))
|
||||
else:
|
||||
self.__lineBuffer = failRegex.getUnmatchedTupleLines()
|
||||
# retrieve failure-id, host, etc from failure match:
|
||||
raw = returnRawHost
|
||||
try:
|
||||
host = failRegex.getHost()
|
||||
if returnRawHost or self.__useDns == "raw":
|
||||
failList.append([failRegexIndex, IPAddr(host), date,
|
||||
failRegex.getMatchedLines()])
|
||||
fail = failRegex.getGroups()
|
||||
# failure-id:
|
||||
fid = fail.get('fid')
|
||||
# ip-address or host:
|
||||
host = fail.get('ip4')
|
||||
if host is not None:
|
||||
raw = True
|
||||
else:
|
||||
host = fail.get('ip6')
|
||||
if host is not None:
|
||||
raw = True
|
||||
else:
|
||||
host = fail.get('dns')
|
||||
if host is None:
|
||||
# if no failure-id also (obscure case, wrong regex), throw error inside getFailID:
|
||||
if fid is None:
|
||||
fid = failRegex.getFailID()
|
||||
host = fid
|
||||
cidr = IPAddr.CIDR_RAW
|
||||
# if raw - add single ip or failure-id,
|
||||
# otherwise expand host to multiple ips using dns (or ignore it if not valid):
|
||||
if raw:
|
||||
ip = IPAddr(host, cidr)
|
||||
# check host equal failure-id, if not - failure with complex id:
|
||||
if fid is not None and fid != host:
|
||||
ip = IPAddr(fid, IPAddr.CIDR_RAW)
|
||||
failList.append([failRegexIndex, ip, date,
|
||||
failRegex.getMatchedLines(), fail])
|
||||
if not checkAllRegex:
|
||||
break
|
||||
else:
|
||||
ips = DNSUtils.textToIp(host, self.__useDns)
|
||||
if ips:
|
||||
for ip in ips:
|
||||
failList.append([failRegexIndex, ip,
|
||||
date, failRegex.getMatchedLines()])
|
||||
failList.append([failRegexIndex, ip, date,
|
||||
failRegex.getMatchedLines(), fail])
|
||||
if not checkAllRegex:
|
||||
break
|
||||
except RegexException, e: # pragma: no cover - unsure if reachable
|
||||
|
|
|
@ -138,18 +138,21 @@ class IPAddr(object):
|
|||
# todo: make configurable the expired time and max count of cache entries:
|
||||
CACHE_OBJ = Utils.Cache(maxCount=1000, maxTime=5*60)
|
||||
|
||||
def __new__(cls, ipstr, cidr=-1):
|
||||
CIDR_RAW = -2
|
||||
CIDR_UNSPEC = -1
|
||||
|
||||
def __new__(cls, ipstr, cidr=CIDR_UNSPEC):
|
||||
# check already cached as IPAddr
|
||||
args = (ipstr, cidr)
|
||||
ip = IPAddr.CACHE_OBJ.get(args)
|
||||
if ip is not None:
|
||||
return ip
|
||||
# wrap mask to cidr (correct plen):
|
||||
if cidr == -1:
|
||||
if cidr == IPAddr.CIDR_UNSPEC:
|
||||
ipstr, cidr = IPAddr.__wrap_ipstr(ipstr)
|
||||
args = (ipstr, cidr)
|
||||
# check cache again:
|
||||
if cidr != -1:
|
||||
if cidr != IPAddr.CIDR_UNSPEC:
|
||||
ip = IPAddr.CACHE_OBJ.get(args)
|
||||
if ip is not None:
|
||||
return ip
|
||||
|
@ -166,7 +169,7 @@ class IPAddr(object):
|
|||
ipstr = ipstr[1:-1]
|
||||
# test mask:
|
||||
if "/" not in ipstr:
|
||||
return ipstr, -1
|
||||
return ipstr, IPAddr.CIDR_UNSPEC
|
||||
s = ipstr.split('/', 1)
|
||||
# IP address without CIDR mask
|
||||
if len(s) > 2:
|
||||
|
@ -176,7 +179,7 @@ class IPAddr(object):
|
|||
s[1] = long(s[1])
|
||||
return s
|
||||
|
||||
def __init(self, ipstr, cidr=-1):
|
||||
def __init(self, ipstr, cidr=CIDR_UNSPEC):
|
||||
""" initialize IP object by converting IP address string
|
||||
to binary to integer
|
||||
"""
|
||||
|
@ -184,49 +187,48 @@ class IPAddr(object):
|
|||
self._addr = 0
|
||||
self._plen = 0
|
||||
self._maskplen = None
|
||||
self._raw = ""
|
||||
# always save raw value (normally used if really raw or not valid only):
|
||||
self._raw = ipstr
|
||||
# if not raw - recognize family, set addr, etc.:
|
||||
if cidr != IPAddr.CIDR_RAW:
|
||||
for family in [socket.AF_INET, socket.AF_INET6]:
|
||||
try:
|
||||
binary = socket.inet_pton(family, ipstr)
|
||||
self._family = family
|
||||
break
|
||||
except socket.error:
|
||||
continue
|
||||
|
||||
for family in [socket.AF_INET, socket.AF_INET6]:
|
||||
try:
|
||||
binary = socket.inet_pton(family, ipstr)
|
||||
self._family = family
|
||||
break
|
||||
except socket.error:
|
||||
continue
|
||||
|
||||
if self._family == socket.AF_INET:
|
||||
# convert host to network byte order
|
||||
self._addr, = struct.unpack("!L", binary)
|
||||
self._plen = 32
|
||||
|
||||
# mask out host portion if prefix length is supplied
|
||||
if cidr is not None and cidr >= 0:
|
||||
mask = ~(0xFFFFFFFFL >> cidr)
|
||||
self._addr &= mask
|
||||
self._plen = cidr
|
||||
|
||||
elif self._family == socket.AF_INET6:
|
||||
# convert host to network byte order
|
||||
hi, lo = struct.unpack("!QQ", binary)
|
||||
self._addr = (hi << 64) | lo
|
||||
self._plen = 128
|
||||
|
||||
# mask out host portion if prefix length is supplied
|
||||
if cidr is not None and cidr >= 0:
|
||||
mask = ~(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFL >> cidr)
|
||||
self._addr &= mask
|
||||
self._plen = cidr
|
||||
|
||||
# if IPv6 address is a IPv4-compatible, make instance a IPv4
|
||||
elif self.isInNet(IPAddr.IP6_4COMPAT):
|
||||
self._addr = lo & 0xFFFFFFFFL
|
||||
self._family = socket.AF_INET
|
||||
if self._family == socket.AF_INET:
|
||||
# convert host to network byte order
|
||||
self._addr, = struct.unpack("!L", binary)
|
||||
self._plen = 32
|
||||
|
||||
# mask out host portion if prefix length is supplied
|
||||
if cidr is not None and cidr >= 0:
|
||||
mask = ~(0xFFFFFFFFL >> cidr)
|
||||
self._addr &= mask
|
||||
self._plen = cidr
|
||||
|
||||
elif self._family == socket.AF_INET6:
|
||||
# convert host to network byte order
|
||||
hi, lo = struct.unpack("!QQ", binary)
|
||||
self._addr = (hi << 64) | lo
|
||||
self._plen = 128
|
||||
|
||||
# mask out host portion if prefix length is supplied
|
||||
if cidr is not None and cidr >= 0:
|
||||
mask = ~(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFL >> cidr)
|
||||
self._addr &= mask
|
||||
self._plen = cidr
|
||||
|
||||
# if IPv6 address is a IPv4-compatible, make instance a IPv4
|
||||
elif self.isInNet(IPAddr.IP6_4COMPAT):
|
||||
self._addr = lo & 0xFFFFFFFFL
|
||||
self._family = socket.AF_INET
|
||||
self._plen = 32
|
||||
else:
|
||||
# string couldn't be converted neither to a IPv4 nor
|
||||
# to a IPv6 address - retain raw input for later use
|
||||
# (e.g. DNS resolution)
|
||||
self._raw = ipstr
|
||||
self._family = IPAddr.CIDR_RAW
|
||||
|
||||
def __repr__(self):
|
||||
return self.ntoa
|
||||
|
@ -270,6 +272,8 @@ class IPAddr(object):
|
|||
return self._family != socket.AF_UNSPEC
|
||||
|
||||
def __eq__(self, other):
|
||||
if self._family == IPAddr.CIDR_RAW and not isinstance(other, IPAddr):
|
||||
return self._raw == other
|
||||
if not isinstance(other, IPAddr):
|
||||
if other is None: return False
|
||||
other = IPAddr(other)
|
||||
|
@ -285,6 +289,8 @@ class IPAddr(object):
|
|||
return not (self == other)
|
||||
|
||||
def __lt__(self, other):
|
||||
if self._family == IPAddr.CIDR_RAW and not isinstance(other, IPAddr):
|
||||
return self._raw < other
|
||||
if not isinstance(other, IPAddr):
|
||||
if other is None: return False
|
||||
other = IPAddr(other)
|
||||
|
|
|
@ -36,7 +36,7 @@ logSys = getLogger(__name__)
|
|||
|
||||
class Ticket:
|
||||
|
||||
def __init__(self, ip=None, time=None, matches=None, ticket=None):
|
||||
def __init__(self, ip=None, time=None, matches=None, data={}, ticket=None):
|
||||
"""Ticket constructor
|
||||
|
||||
@param ip the IP address
|
||||
|
@ -50,6 +50,7 @@ class Ticket:
|
|||
self._banTime = None;
|
||||
self._time = time if time is not None else MyTime.time()
|
||||
self._data = {'matches': [], 'failures': 0}
|
||||
self._data.update(data)
|
||||
if ticket:
|
||||
# ticket available - copy whole information from ticket:
|
||||
self.__dict__.update(i for i in ticket.__dict__.iteritems() if i[0] in self.__dict__)
|
||||
|
@ -78,6 +79,9 @@ class Ticket:
|
|||
value = IPAddr(value)
|
||||
self.__ip = value
|
||||
|
||||
def getID(self):
|
||||
return self._data.get('fid', self.__ip)
|
||||
|
||||
def getIP(self):
|
||||
return self.__ip
|
||||
|
||||
|
@ -164,12 +168,12 @@ class Ticket:
|
|||
|
||||
class FailTicket(Ticket):
|
||||
|
||||
def __init__(self, ip=None, time=None, matches=None, ticket=None):
|
||||
def __init__(self, ip=None, time=None, matches=None, data={}, ticket=None):
|
||||
# this class variables:
|
||||
self.__retry = 0
|
||||
self.__lastReset = None
|
||||
# create/copy using default ticket constructor:
|
||||
Ticket.__init__(self, ip, time, matches, ticket)
|
||||
Ticket.__init__(self, ip, time, matches, data, ticket)
|
||||
# init:
|
||||
if ticket is None:
|
||||
self.__lastReset = time if time is not None else self.getTime()
|
||||
|
|
|
@ -127,7 +127,7 @@ def testSampleRegexsFactory(name, basedir):
|
|||
(map(lambda x: x[0], ret),logFile.filename(), logFile.filelineno()))
|
||||
|
||||
# Verify timestamp and host as expected
|
||||
failregex, host, fail2banTime, lines = ret[0]
|
||||
failregex, host, fail2banTime, lines, fail = ret[0]
|
||||
self.assertEqual(host, faildata.get("host", None))
|
||||
|
||||
t = faildata.get("time", None)
|
||||
|
|
|
@ -933,6 +933,14 @@ class RegexTests(unittest.TestCase):
|
|||
|
||||
def testHost(self):
|
||||
self.assertRaises(RegexException, FailRegex, '')
|
||||
self.assertRaises(RegexException, FailRegex, '^test no group$')
|
||||
self.assertTrue(FailRegex('^test <HOST> group$'))
|
||||
self.assertTrue(FailRegex('^test <IP4> group$'))
|
||||
self.assertTrue(FailRegex('^test <IP6> group$'))
|
||||
self.assertTrue(FailRegex('^test <DNS> group$'))
|
||||
self.assertTrue(FailRegex('^test id group: ip:port = <F-ID><IP4>(?::<F-PORT/>)?</F-ID>$'))
|
||||
self.assertTrue(FailRegex('^test id group: user:\(<F-ID>[^\)]+</F-ID>\)$'))
|
||||
self.assertTrue(FailRegex('^test id group: anything = <F-ID/>$'))
|
||||
# Testing obscure case when host group might be missing in the matched pattern,
|
||||
# e.g. if we made it optional.
|
||||
fr = FailRegex('%%<HOST>?')
|
||||
|
@ -940,6 +948,30 @@ class RegexTests(unittest.TestCase):
|
|||
fr.search([('%%',"","")])
|
||||
self.assertTrue(fr.hasMatched())
|
||||
self.assertRaises(RegexException, fr.getHost)
|
||||
# The same as above but using separated IPv4/IPv6 expressions
|
||||
fr = FailRegex('%%inet(?:=<F-IP4/>|inet6=<F-IP6/>)?')
|
||||
self.assertFalse(fr.hasMatched())
|
||||
fr.search([('%%inet=test',"","")])
|
||||
self.assertTrue(fr.hasMatched())
|
||||
self.assertRaises(RegexException, fr.getHost)
|
||||
# Success case: using separated IPv4/IPv6 expressions (no HOST)
|
||||
fr = FailRegex('%%(?:inet(?:=<IP4>|6=<IP6>)?|dns=<DNS>?)')
|
||||
self.assertFalse(fr.hasMatched())
|
||||
fr.search([('%%inet=192.0.2.1',"","")])
|
||||
self.assertTrue(fr.hasMatched())
|
||||
self.assertEqual(fr.getHost(), '192.0.2.1')
|
||||
fr.search([('%%inet6=2001:DB8::',"","")])
|
||||
self.assertTrue(fr.hasMatched())
|
||||
self.assertEqual(fr.getHost(), '2001:DB8::')
|
||||
fr.search([('%%dns=example.com',"","")])
|
||||
self.assertTrue(fr.hasMatched())
|
||||
self.assertEqual(fr.getHost(), 'example.com')
|
||||
# Success case: using user as failure-id
|
||||
fr = FailRegex('^test id group: user:\(<F-ID>[^\)]+</F-ID>\)$')
|
||||
self.assertFalse(fr.hasMatched())
|
||||
fr.search([('test id group: user:(test login name)',"","")])
|
||||
self.assertTrue(fr.hasMatched())
|
||||
self.assertEqual(fr.getFailID(), 'test login name')
|
||||
|
||||
|
||||
class _BadThread(JailThread):
|
||||
|
@ -998,6 +1030,24 @@ class ServerConfigReaderTests(LogCaptureTestCase):
|
|||
self.assertTrue(IPAddr('192.0.2.1').isIPv4)
|
||||
self.assertTrue(IPAddr('2001:DB8::').isIPv6)
|
||||
|
||||
def test_IPAddr_Raw(self):
|
||||
# raw string:
|
||||
r = IPAddr('xxx', IPAddr.CIDR_RAW)
|
||||
self.assertFalse(r.isIPv4)
|
||||
self.assertFalse(r.isIPv6)
|
||||
self.assertTrue(r.isValid)
|
||||
self.assertEqual(r, 'xxx')
|
||||
self.assertEqual('xxx', str(r))
|
||||
self.assertNotEqual(r, IPAddr('xxx'))
|
||||
# raw (not IP, for example host:port as string):
|
||||
r = IPAddr('1:2', IPAddr.CIDR_RAW)
|
||||
self.assertFalse(r.isIPv4)
|
||||
self.assertFalse(r.isIPv6)
|
||||
self.assertTrue(r.isValid)
|
||||
self.assertEqual(r, '1:2')
|
||||
self.assertEqual('1:2', str(r))
|
||||
self.assertNotEqual(r, IPAddr('1:2'))
|
||||
|
||||
def _testExecActions(self, server):
|
||||
jails = server._Server__jails
|
||||
for jail in jails:
|
||||
|
|
Loading…
Reference in New Issue