extends capturing alternate tags in filter, implementing new tag prefix `<F-TUPLE_` (that would combine value of `<F-V>` with all value of <F-TUPLE_V?_n?> tags), for examples see new tests in fail2banregextestcase;

closes gh-2755 (extends #1454 and #1698).
pull/2809/head
sebres 2020-06-12 20:00:42 +02:00
parent 1da9ab78be
commit dd8081ade5
4 changed files with 57 additions and 20 deletions

View File

@ -868,7 +868,7 @@ class CommandAction(ActionBase):
tickData = aInfo.get("F-*") tickData = aInfo.get("F-*")
if not tickData: tickData = {} if not tickData: tickData = {}
def substTag(m): def substTag(m):
tag = mapTag2Opt(m.groups()[0]) tag = mapTag2Opt(m.group(1))
try: try:
value = uni_string(tickData[tag]) value = uni_string(tickData[tag])
except KeyError: except KeyError:

View File

@ -87,20 +87,24 @@ RH4TAG = {
# default failure groups map for customizable expressions (with different group-id): # default failure groups map for customizable expressions (with different group-id):
R_MAP = { R_MAP = {
"ID": "fid", "id": "fid",
"PORT": "fport", "port": "fport",
} }
def mapTag2Opt(tag): def mapTag2Opt(tag):
try: # if should be mapped: tag = tag.lower()
return R_MAP[tag] return R_MAP.get(tag, tag)
except KeyError:
return tag.lower()
# alternate names to be merged, e. g. alt_user_1 -> user ... # complex names:
# ALT_ - alternate names to be merged, e. g. alt_user_1 -> user ...
ALTNAME_PRE = 'alt_' ALTNAME_PRE = 'alt_'
ALTNAME_CRE = re.compile(r'^' + ALTNAME_PRE + r'(.*)(?:_\d+)?$') # TUPLE_ - names of parts to be combined to single value as tuple
TUPNAME_PRE = 'tuple_'
COMPLNAME_PRE = (ALTNAME_PRE, TUPNAME_PRE)
COMPLNAME_CRE = re.compile(r'^(' + '|'.join(COMPLNAME_PRE) + r')(.*?)(?:_\d+)?$')
## ##
# Regular expression class. # Regular expression class.
@ -128,18 +132,23 @@ class Regex:
self._regexObj = re.compile(regex, re.MULTILINE if multiline else 0) self._regexObj = re.compile(regex, re.MULTILINE if multiline else 0)
self._regex = regex self._regex = regex
self._altValues = {} self._altValues = {}
self._tupleValues = {}
for k in filter( for k in filter(
lambda k: len(k) > len(ALTNAME_PRE) and k.startswith(ALTNAME_PRE), lambda k: len(k) > len(COMPLNAME_PRE[0]), self._regexObj.groupindex
self._regexObj.groupindex
): ):
n = ALTNAME_CRE.match(k).group(1) n = COMPLNAME_CRE.match(k)
self._altValues[k] = n if n:
if n.group(1) == ALTNAME_PRE:
self._altValues[k] = mapTag2Opt(n.group(2))
else:
self._tupleValues[k] = mapTag2Opt(n.group(2))
self._altValues = list(self._altValues.items()) if len(self._altValues) else None self._altValues = list(self._altValues.items()) if len(self._altValues) else None
self._tupleValues = list(self._tupleValues.items()) if len(self._tupleValues) else None
except sre_constants.error: except sre_constants.error:
raise RegexException("Unable to compile regular expression '%s'" % raise RegexException("Unable to compile regular expression '%s'" %
regex) regex)
# set fetch handler depending on presence of alternate tags: # set fetch handler depending on presence of alternate tags:
self.getGroups = self._getGroupsWithAlt if self._altValues else self._getGroups self.getGroups = self._getGroupsWithAlt if (self._altValues or self._tupleValues) else self._getGroups
def __str__(self): def __str__(self):
return "%s(%r)" % (self.__class__.__name__, self._regex) return "%s(%r)" % (self.__class__.__name__, self._regex)
@ -284,12 +293,23 @@ class Regex:
def _getGroupsWithAlt(self): def _getGroupsWithAlt(self):
fail = self._matchCache.groupdict() fail = self._matchCache.groupdict()
# merge alternate values (e. g. 'alt_user_1' -> 'user' or 'alt_host' -> 'host'):
#fail = fail.copy() #fail = fail.copy()
# merge alternate values (e. g. 'alt_user_1' -> 'user' or 'alt_host' -> 'host'):
if self._altValues:
for k,n in self._altValues: for k,n in self._altValues:
v = fail.get(k) v = fail.get(k)
if v and not fail.get(n): if v and not fail.get(n):
fail[n] = v fail[n] = v
# combine tuple values (e. g. 'id', 'tuple_id' ... 'tuple_id_N' -> 'id'):
if self._tupleValues:
for k,n in self._tupleValues:
v = fail.get(k)
t = fail.get(n)
if isinstance(t, tuple):
t += (v,)
else:
t = (t,v,)
fail[n] = t
return fail return fail
def getGroups(self): # pragma: no cover - abstract function (replaced in __init__) def getGroups(self): # pragma: no cover - abstract function (replaced in __init__)

View File

@ -337,7 +337,7 @@ class IPAddr(object):
return repr(self.ntoa) return repr(self.ntoa)
def __str__(self): def __str__(self):
return self.ntoa return self.ntoa if isinstance(self.ntoa, basestring) else str(self.ntoa)
def __reduce__(self): def __reduce__(self):
"""IPAddr pickle-handler, that simply wraps IPAddr to the str """IPAddr pickle-handler, that simply wraps IPAddr to the str

View File

@ -340,6 +340,23 @@ class Fail2banRegexTest(LogCaptureTestCase):
self.assertTrue(_test_exec('-o', 'id', STR_00, RE_00_ID)) self.assertTrue(_test_exec('-o', 'id', STR_00, RE_00_ID))
self.assertLogged('kevin') self.assertLogged('kevin')
self.pruneLog() self.pruneLog()
# multiple id combined to a tuple (id, tuple_id):
self.assertTrue(_test_exec('-o', 'id',
'1591983743.667 192.0.2.1 192.0.2.2',
r'^\s*<F-ID/> <F-TUPLE_ID>\S+</F-TUPLE_ID>'))
self.assertLogged(str(('192.0.2.1', '192.0.2.2')))
self.pruneLog()
# multiple id combined to a tuple, id first - (id, tuple_id_1, tuple_id_2):
self.assertTrue(_test_exec('-o', 'id',
'1591983743.667 left 192.0.2.3 right',
r'^\s*<F-TUPLE_ID_1>\S+</F-TUPLE_ID_1> <F-ID/> <F-TUPLE_ID_2>\S+</F-TUPLE_ID_2>'))
self.pruneLog()
# id had higher precedence as ip-address:
self.assertTrue(_test_exec('-o', 'id',
'1591983743.667 left [192.0.2.4]:12345 right',
r'^\s*<F-TUPLE_ID_1>\S+</F-TUPLE_ID_1> <F-ID><ADDR>:<F-PORT/></F-ID> <F-TUPLE_ID_2>\S+</F-TUPLE_ID_2>'))
self.assertLogged(str(('[192.0.2.4]:12345', 'left', 'right')))
self.pruneLog()
# row with id : # row with id :
self.assertTrue(_test_exec('-o', 'row', STR_00, RE_00_ID)) self.assertTrue(_test_exec('-o', 'row', STR_00, RE_00_ID))
self.assertLogged("['kevin'", "'ip4': '192.0.2.0'", "'fid': 'kevin'", all=True) self.assertLogged("['kevin'", "'ip4': '192.0.2.0'", "'fid': 'kevin'", all=True)