mirror of https://github.com/fail2ban/fail2ban
repairs testing of missing samples for all regex after filter settings (mode) changed
parent
482e5265d7
commit
97d417926d
|
@ -183,15 +183,12 @@ class Filter(JailThread):
|
||||||
"valid", index)
|
"valid", index)
|
||||||
|
|
||||||
##
|
##
|
||||||
# Get the regular expression which matches the failure.
|
# Get the regular expressions as list.
|
||||||
#
|
#
|
||||||
# @return the regular expression
|
# @return the regular expression list
|
||||||
|
|
||||||
def getFailRegex(self):
|
def getFailRegex(self):
|
||||||
failRegex = list()
|
return [regex.getRegex() for regex in self.__failRegex]
|
||||||
for regex in self.__failRegex:
|
|
||||||
failRegex.append(regex.getRegex())
|
|
||||||
return failRegex
|
|
||||||
|
|
||||||
##
|
##
|
||||||
# Add the regular expression which matches the failure.
|
# Add the regular expression which matches the failure.
|
||||||
|
|
|
@ -99,8 +99,8 @@ class FilterSamplesRegex(unittest.TestCase):
|
||||||
optval = opt[3]
|
optval = opt[3]
|
||||||
elif opt[0] == 'set':
|
elif opt[0] == 'set':
|
||||||
optval = [opt[3]]
|
optval = [opt[3]]
|
||||||
else:
|
else: # pragma: no cover - unexpected
|
||||||
continue
|
self.fail('Unexpected config-token %r in stream' % (opt,))
|
||||||
for optval in optval:
|
for optval in optval:
|
||||||
if opt[2] == "prefregex":
|
if opt[2] == "prefregex":
|
||||||
self.filter.prefRegex = optval
|
self.filter.prefRegex = optval
|
||||||
|
@ -115,11 +115,13 @@ class FilterSamplesRegex(unittest.TestCase):
|
||||||
|
|
||||||
# test regexp contains greedy catch-all before <HOST>, that is
|
# test regexp contains greedy catch-all before <HOST>, that is
|
||||||
# not hard-anchored at end or has not precise sub expression after <HOST>:
|
# not hard-anchored at end or has not precise sub expression after <HOST>:
|
||||||
for fr in self.filter.getFailRegex():
|
regexList = self.filter.getFailRegex()
|
||||||
|
for fr in regexList:
|
||||||
if RE_WRONG_GREED.search(fr): # pragma: no cover
|
if RE_WRONG_GREED.search(fr): # pragma: no cover
|
||||||
raise AssertionError("Following regexp of \"%s\" contains greedy catch-all before <HOST>, "
|
raise AssertionError("Following regexp of \"%s\" contains greedy catch-all before <HOST>, "
|
||||||
"that is not hard-anchored at end or has not precise sub expression after <HOST>:\n%s" %
|
"that is not hard-anchored at end or has not precise sub expression after <HOST>:\n%s" %
|
||||||
(name, str(fr).replace(RE_HOST, '<HOST>')))
|
(name, str(fr).replace(RE_HOST, '<HOST>')))
|
||||||
|
return regexList
|
||||||
|
|
||||||
def testSampleRegexsFactory(name, basedir):
|
def testSampleRegexsFactory(name, basedir):
|
||||||
def testFilter(self):
|
def testFilter(self):
|
||||||
|
@ -128,8 +130,17 @@ def testSampleRegexsFactory(name, basedir):
|
||||||
os.path.isfile(os.path.join(TEST_FILES_DIR, "logs", name)),
|
os.path.isfile(os.path.join(TEST_FILES_DIR, "logs", name)),
|
||||||
"No sample log file available for '%s' filter" % name)
|
"No sample log file available for '%s' filter" % name)
|
||||||
|
|
||||||
regexsUsed = set()
|
regexList = None
|
||||||
|
regexsUsedIdx = set()
|
||||||
|
regexsUsedRe = set()
|
||||||
filenames = [name]
|
filenames = [name]
|
||||||
|
|
||||||
|
def _testMissingSamples():
|
||||||
|
for failRegexIndex, failRegex in enumerate(regexList):
|
||||||
|
self.assertTrue(
|
||||||
|
failRegexIndex in regexsUsedIdx or failRegex in regexsUsedRe,
|
||||||
|
"Regex for filter '%s' has no samples: %i: %r" %
|
||||||
|
(name, failRegexIndex, failRegex))
|
||||||
i = 0
|
i = 0
|
||||||
while i < len(filenames):
|
while i < len(filenames):
|
||||||
filename = filenames[i]; i += 1;
|
filename = filenames[i]; i += 1;
|
||||||
|
@ -143,25 +154,30 @@ def testSampleRegexsFactory(name, basedir):
|
||||||
faildata = json.loads(jsonREMatch.group(2))
|
faildata = json.loads(jsonREMatch.group(2))
|
||||||
# filterOptions - dict in JSON to control filter options (e. g. mode, etc.):
|
# filterOptions - dict in JSON to control filter options (e. g. mode, etc.):
|
||||||
if jsonREMatch.group(1) == 'filterOptions':
|
if jsonREMatch.group(1) == 'filterOptions':
|
||||||
|
# another filter mode - we should check previous also:
|
||||||
|
if self.filter is not None:
|
||||||
|
_testMissingSamples()
|
||||||
|
regexsUsedIdx = set() # clear used indices (possible overlapping by mode change)
|
||||||
|
# read filter with another setting:
|
||||||
self.filter = None
|
self.filter = None
|
||||||
self._readFilter(name, basedir, opts=faildata)
|
regexList = self._readFilter(name, basedir, opts=faildata)
|
||||||
continue
|
continue
|
||||||
# addFILE - filename to "include" test-files should be additionally parsed:
|
# addFILE - filename to "include" test-files should be additionally parsed:
|
||||||
if jsonREMatch.group(1) == 'addFILE':
|
if jsonREMatch.group(1) == 'addFILE':
|
||||||
filenames.append(faildata)
|
filenames.append(faildata)
|
||||||
continue
|
continue
|
||||||
# failJSON - faildata contains info of the failure to check it.
|
# failJSON - faildata contains info of the failure to check it.
|
||||||
except ValueError as e:
|
except ValueError as e: # pragma: no cover - we've valid json's
|
||||||
raise ValueError("%s: %s:%i" %
|
raise ValueError("%s: %s:%i" %
|
||||||
(e, logFile.filename(), logFile.filelineno()))
|
(e, logFile.filename(), logFile.filelineno()))
|
||||||
line = next(logFile)
|
line = next(logFile)
|
||||||
elif line.startswith("#") or not line.strip():
|
elif line.startswith("#") or not line.strip():
|
||||||
continue
|
continue
|
||||||
else:
|
else: # pragma: no cover - normally unreachable
|
||||||
faildata = {}
|
faildata = {}
|
||||||
|
|
||||||
if self.filter is None:
|
if self.filter is None:
|
||||||
self._readFilter(name, basedir, opts=None)
|
regexList = self._readFilter(name, basedir, opts=None)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
ret = self.filter.processLine(line)
|
ret = self.filter.processLine(line)
|
||||||
|
@ -174,7 +190,8 @@ def testSampleRegexsFactory(name, basedir):
|
||||||
failregex, fid, fail2banTime, fail = ret[0]
|
failregex, fid, fail2banTime, fail = ret[0]
|
||||||
# Bypass no failure helpers-regexp:
|
# Bypass no failure helpers-regexp:
|
||||||
if not faildata.get('match', False) and (fid is None or fail.get('nofail')):
|
if not faildata.get('match', False) and (fid is None or fail.get('nofail')):
|
||||||
regexsUsed.add(failregex)
|
regexsUsedIdx.add(failregex)
|
||||||
|
regexsUsedRe.add(regexList[failregex])
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Check line is flagged to match
|
# Check line is flagged to match
|
||||||
|
@ -208,16 +225,13 @@ def testSampleRegexsFactory(name, basedir):
|
||||||
jsonTime, time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(jsonTime)),
|
jsonTime, time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(jsonTime)),
|
||||||
fail2banTime - jsonTime) )
|
fail2banTime - jsonTime) )
|
||||||
|
|
||||||
regexsUsed.add(failregex)
|
regexsUsedIdx.add(failregex)
|
||||||
|
regexsUsedRe.add(regexList[failregex])
|
||||||
except AssertionError as e: # pragma: no cover
|
except AssertionError as e: # pragma: no cover
|
||||||
raise AssertionError("%s on: %s:%i, line:\n%s" % (
|
raise AssertionError("%s on: %s:%i, line:\n%s" % (
|
||||||
e, logFile.filename(), logFile.filelineno(), line))
|
e, logFile.filename(), logFile.filelineno(), line))
|
||||||
|
|
||||||
for failRegexIndex, failRegex in enumerate(self.filter.getFailRegex()):
|
_testMissingSamples()
|
||||||
self.assertTrue(
|
|
||||||
failRegexIndex in regexsUsed,
|
|
||||||
"Regex for filter '%s' has no samples: %i: %r" %
|
|
||||||
(name, failRegexIndex, failRegex))
|
|
||||||
|
|
||||||
return testFilter
|
return testFilter
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue