mirror of https://github.com/fail2ban/fail2ban
samples test case factory extended with filter options - dict in JSON to control filter options (e. g. mode, etc.):
# filterOptions: {"mode": "aggressive"}pull/1710/head
parent
52ed6597b2
commit
a683e88a74
|
@ -49,12 +49,7 @@ class FilterSamplesRegex(unittest.TestCase):
|
|||
def setUp(self):
|
||||
"""Call before every test case."""
|
||||
super(FilterSamplesRegex, self).setUp()
|
||||
self.filter = Filter(None)
|
||||
self.filter.returnRawHost = True
|
||||
self.filter.checkAllRegex = True
|
||||
self.filter.checkFindTime = False
|
||||
self.filter.active = True
|
||||
|
||||
self.filter = None
|
||||
setUpMyTime()
|
||||
|
||||
def tearDown(self):
|
||||
|
@ -83,11 +78,15 @@ class FilterSamplesRegex(unittest.TestCase):
|
|||
RE_WRONG_GREED.search('non-greedy .+? test' + RE_HOST + ' test vary catch-all .* anchored$'))
|
||||
|
||||
|
||||
def testSampleRegexsFactory(name, basedir):
|
||||
def testFilter(self):
|
||||
|
||||
def _readFilter(self, name, basedir, opts=None):
|
||||
self.filter = Filter(None)
|
||||
self.filter.returnRawHost = True
|
||||
self.filter.checkAllRegex = True
|
||||
self.filter.checkFindTime = False
|
||||
self.filter.active = True
|
||||
if opts is None: opts = dict()
|
||||
# Check filter exists
|
||||
filterConf = FilterReader(name, "jail", {},
|
||||
filterConf = FilterReader(name, "jail", opts,
|
||||
basedir=basedir, share_config=unittest.F2B.share_config)
|
||||
self.assertEqual(filterConf.getFile(), name)
|
||||
self.assertEqual(filterConf.getJailName(), "jail")
|
||||
|
@ -113,6 +112,17 @@ def testSampleRegexsFactory(name, basedir):
|
|||
elif opt[2] == "datepattern":
|
||||
self.filter.setDatePattern(optval)
|
||||
|
||||
# test regexp contains greedy catch-all before <HOST>, that is
|
||||
# not hard-anchored at end or has not precise sub expression after <HOST>:
|
||||
for fr in self.filter.getFailRegex():
|
||||
if RE_WRONG_GREED.search(fr): # pragma: no cover
|
||||
raise AssertionError("Following regexp of \"%s\" contains greedy catch-all before <HOST>, "
|
||||
"that is not hard-anchored at end or has not precise sub expression after <HOST>:\n%s" %
|
||||
(name, str(fr).replace(RE_HOST, '<HOST>')))
|
||||
|
||||
def testSampleRegexsFactory(name, basedir):
|
||||
def testFilter(self):
|
||||
|
||||
self.assertTrue(
|
||||
os.path.isfile(os.path.join(TEST_FILES_DIR, "logs", name)),
|
||||
"No sample log file available for '%s' filter" % name)
|
||||
|
@ -125,22 +135,21 @@ def testSampleRegexsFactory(name, basedir):
|
|||
logFile = fileinput.FileInput(os.path.join(TEST_FILES_DIR, "logs",
|
||||
filename))
|
||||
|
||||
# test regexp contains greedy catch-all before <HOST>, that is
|
||||
# not hard-anchored at end or has not precise sub expression after <HOST>:
|
||||
for fr in self.filter.getFailRegex():
|
||||
if RE_WRONG_GREED.search(fr): # pragma: no cover
|
||||
raise AssertionError("Following regexp of \"%s\" contains greedy catch-all before <HOST>, "
|
||||
"that is not hard-anchored at end or has not precise sub expression after <HOST>:\n%s" %
|
||||
(name, str(fr).replace(RE_HOST, '<HOST>')))
|
||||
|
||||
for line in logFile:
|
||||
jsonREMatch = re.match("^# ?(failJSON|addFILE):(.+)$", line)
|
||||
jsonREMatch = re.match("^#+ ?(failJSON|filterOptions|addFILE):(.+)$", line)
|
||||
if jsonREMatch:
|
||||
try:
|
||||
faildata = json.loads(jsonREMatch.group(2))
|
||||
# filterOptions - dict in JSON to control filter options (e. g. mode, etc.):
|
||||
if jsonREMatch.group(1) == 'filterOptions':
|
||||
self.filter = None
|
||||
self._readFilter(name, basedir, opts=faildata)
|
||||
continue
|
||||
# addFILE - filename to "include" test-files should be additionally parsed:
|
||||
if jsonREMatch.group(1) == 'addFILE':
|
||||
filenames.append(faildata)
|
||||
continue
|
||||
# failJSON - faildata contains info of the failure to check it.
|
||||
except ValueError as e:
|
||||
raise ValueError("%s: %s:%i" %
|
||||
(e, logFile.filename(), logFile.filelineno()))
|
||||
|
@ -150,6 +159,9 @@ def testSampleRegexsFactory(name, basedir):
|
|||
else:
|
||||
faildata = {}
|
||||
|
||||
if self.filter is None:
|
||||
self._readFilter(name, basedir, opts=None)
|
||||
|
||||
try:
|
||||
ret = self.filter.processLine(line)
|
||||
if not ret:
|
||||
|
|
Loading…
Reference in New Issue