extractOptions: ensure options are parsed completely - avoids unexpected skip or truncate of parameters, produces more verbose error message in case of incorrect syntax; added more tests covering several cases

WARN: potential incompatibility (since it doesn't silently ignore wrong syntax anymore)
pull/1833/merge
sebres 2021-02-03 14:45:30 +01:00
parent c75748c5d3
commit 366c64cb9d
5 changed files with 57 additions and 33 deletions

View File

@ -35,6 +35,7 @@ __license__ = "GPL"
import getopt
import logging
import re
import os
import shlex
import sys
@ -329,26 +330,33 @@ class Fail2banRegex(object):
regex = regextype + 'regex'
# try to check - we've case filter?[options...]?:
basedir = self._opts.config
fltName = value
fltFile = None
fltOpt = {}
if regextype == 'fail':
fltName, fltOpt = extractOptions(value)
if fltName is not None:
if "." in fltName[~5:]:
tryNames = (fltName,)
else:
tryNames = (fltName, fltName + '.conf', fltName + '.local')
for fltFile in tryNames:
if not "/" in fltFile:
if os.path.basename(basedir) == 'filter.d':
fltFile = os.path.join(basedir, fltFile)
else:
fltFile = os.path.join(basedir, 'filter.d', fltFile)
if re.search(r'^/{0,3}[\w/_\-.]+(?:\[.*\])?$', value):
try:
fltName, fltOpt = extractOptions(value)
if "." in fltName[~5:]:
tryNames = (fltName,)
else:
basedir = os.path.dirname(fltFile)
if os.path.isfile(fltFile):
break
fltFile = None
tryNames = (fltName, fltName + '.conf', fltName + '.local')
for fltFile in tryNames:
if not "/" in fltFile:
if os.path.basename(basedir) == 'filter.d':
fltFile = os.path.join(basedir, fltFile)
else:
fltFile = os.path.join(basedir, 'filter.d', fltFile)
else:
basedir = os.path.dirname(fltFile)
if os.path.isfile(fltFile):
break
fltFile = None
except Exception as e:
output("ERROR: Wrong filter name or options: %s" % (str(e),))
output(" while parsing: %s" % (value,))
if self._verbose: raise(e)
return False
# if it is filter file:
if fltFile is not None:
if (basedir == self._opts.config

View File

@ -133,9 +133,10 @@ class JailReader(ConfigReader):
# Read filter
flt = self.__opts["filter"]
if flt:
filterName, filterOpt = extractOptions(flt)
if not filterName:
raise JailDefError("Invalid filter definition %r" % flt)
try:
filterName, filterOpt = extractOptions(flt)
except ValueError as e:
raise JailDefError("Invalid filter definition %r: %s" % (flt, e))
self.__filter = FilterReader(
filterName, self.__name, filterOpt,
share_config=self.share_config, basedir=self.getBaseDir())
@ -167,10 +168,10 @@ class JailReader(ConfigReader):
if not act: # skip empty actions
continue
# join with previous line if needed (consider possible new-line):
actName, actOpt = extractOptions(act)
prevln = ''
if not actName:
raise JailDefError("Invalid action definition %r" % act)
try:
actName, actOpt = extractOptions(act)
except ValueError as e:
raise JailDefError("Invalid action definition %r: %s" % (act, e))
if actName.endswith(".py"):
self.__actions.append([
"set",

View File

@ -371,7 +371,7 @@ OPTION_CRE = re.compile(r"^([^\[]+)(?:\[(.*)\])?\s*$", re.DOTALL)
# since v0.10 separator extended with `]\s*[` for support of multiple option groups, syntax
# `action = act[p1=...][p2=...]`
OPTION_EXTRACT_CRE = re.compile(
r'([\w\-_\.]+)=(?:"([^"]*)"|\'([^\']*)\'|([^,\]]*))(?:,|\]\s*\[|$)', re.DOTALL)
r'\s*([\w\-_\.]+)=(?:"([^"]*)"|\'([^\']*)\'|([^,\]]*))(?:,|\]\s*\[|$|(?P<wrngA>.+))|,?\s*$|(?P<wrngB>.+)', re.DOTALL)
# split by new-line considering possible new-lines within options [...]:
OPTION_SPLIT_CRE = re.compile(
r'(?:[^\[\s]+(?:\s*\[\s*(?:[\w\-_\.]+=(?:"[^"]*"|\'[^\']*\'|[^,\]]*)\s*(?:,|\]\s*\[)?\s*)*\])?\s*|\S+)(?=\n\s*|\s+|$)', re.DOTALL)
@ -379,13 +379,19 @@ OPTION_SPLIT_CRE = re.compile(
def extractOptions(option):
match = OPTION_CRE.match(option)
if not match:
# TODO proper error handling
return None, None
raise ValueError("unexpected option syntax")
option_name, optstr = match.groups()
option_opts = dict()
if optstr:
for optmatch in OPTION_EXTRACT_CRE.finditer(optstr):
if optmatch.group("wrngA"):
raise ValueError("unexpected syntax at %d after option %r: %s" % (
optmatch.start("wrngA"), optmatch.group(1), optmatch.group("wrngA")[0:25]))
if optmatch.group("wrngB"):
raise ValueError("expected option, wrong syntax at %d: %s" % (
optmatch.start("wrngB"), optmatch.group("wrngB")[0:25]))
opt = optmatch.group(1)
if not opt: continue
value = [
val for val in optmatch.group(2,3,4) if val is not None][0]
option_opts[opt.strip()] = value.strip()

View File

@ -381,13 +381,16 @@ class JailReaderTest(LogCaptureTestCase):
self.assertEqual(('mail.who_is', {'a':'cat', 'b':'dog'}), extractOptions("mail.who_is[a=cat,b=dog]"))
self.assertEqual(('mail--ho_is', {}), extractOptions("mail--ho_is"))
self.assertEqual(('mail--ho_is', {}), extractOptions("mail--ho_is['s']"))
#print(self.getLog())
#self.assertLogged("Invalid argument ['s'] in ''s''")
self.assertEqual(('mail', {'a': ','}), extractOptions("mail[a=',']"))
self.assertEqual(('mail', {'a': 'b'}), extractOptions("mail[a=b, ]"))
#self.assertRaises(ValueError, extractOptions ,'mail-how[')
self.assertRaises(ValueError, extractOptions ,'mail-how[')
self.assertRaises(ValueError, extractOptions, """mail[a="test with interim (wrong) "" quotes"]""")
self.assertRaises(ValueError, extractOptions, """mail[a='test with interim (wrong) '' quotes']""")
self.assertRaises(ValueError, extractOptions, """mail[a='x, y, z', b=x, y, z]""")
self.assertRaises(ValueError, extractOptions, """mail['s']""")
# Empty option
option = "abc[]"
@ -752,9 +755,9 @@ class JailsReaderTest(LogCaptureTestCase):
['add', 'tz_correct', 'auto'],
['start', 'tz_correct'],
['config-error',
"Jail 'brokenactiondef' skipped, because of wrong configuration: Invalid action definition 'joho[foo'"],
"Jail 'brokenactiondef' skipped, because of wrong configuration: Invalid action definition 'joho[foo': unexpected option syntax"],
['config-error',
"Jail 'brokenfilterdef' skipped, because of wrong configuration: Invalid filter definition 'flt[test'"],
"Jail 'brokenfilterdef' skipped, because of wrong configuration: Invalid filter definition 'flt[test': unexpected option syntax"],
['config-error',
"Jail 'missingaction' skipped, because of wrong configuration: Unable to read action 'noactionfileforthisaction'"],
['config-error',

View File

@ -141,6 +141,12 @@ class Fail2banRegexTest(LogCaptureTestCase):
))
self.assertLogged("Unable to compile regular expression")
def testWrongFilterOptions(self):
self.assertFalse(_test_exec(
"test", "flt[a='x,y,z',b=z,y,x]"
))
self.assertLogged("Wrong filter name or options", "wrong syntax at 14: y,x", all=True)
def testDirectFound(self):
self.assertTrue(_test_exec(
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",