mirror of https://github.com/fail2ban/fail2ban
amend with better (common) handling, documentation and tests
parent
7b05c1ce7a
commit
d9b8796792
|
@ -34,6 +34,30 @@ from ..helpers import getLogger, _as_bool, _merge_dicts, substituteRecursiveTags
|
||||||
# Gets the instance of the logger.
|
# Gets the instance of the logger.
|
||||||
logSys = getLogger(__name__)
|
logSys = getLogger(__name__)
|
||||||
|
|
||||||
|
CONVERTER = {
|
||||||
|
"bool": _as_bool,
|
||||||
|
"int": int,
|
||||||
|
}
|
||||||
|
def _OptionsTemplateGen(options):
|
||||||
|
"""Iterator over the options template with default options.
|
||||||
|
|
||||||
|
Each options entry is composed of an array or tuple with:
|
||||||
|
[[type, name, ?default?], ...]
|
||||||
|
Or it is a dict:
|
||||||
|
{name: [type, default], ...}
|
||||||
|
"""
|
||||||
|
if isinstance(options, (list,tuple)):
|
||||||
|
for optname in options:
|
||||||
|
if len(optname) > 2:
|
||||||
|
opttype, optname, optvalue = optname
|
||||||
|
else:
|
||||||
|
(opttype, optname), optvalue = optname, None
|
||||||
|
yield opttype, optname, optvalue
|
||||||
|
else:
|
||||||
|
for optname in options:
|
||||||
|
opttype, optvalue = options[optname]
|
||||||
|
yield opttype, optname, optvalue
|
||||||
|
|
||||||
|
|
||||||
class ConfigReader():
|
class ConfigReader():
|
||||||
"""Generic config reader class.
|
"""Generic config reader class.
|
||||||
|
@ -233,29 +257,17 @@ class ConfigReaderUnshared(SafeConfigParserWithIncludes):
|
||||||
if pOptions is None:
|
if pOptions is None:
|
||||||
pOptions = {}
|
pOptions = {}
|
||||||
# Get only specified options:
|
# Get only specified options:
|
||||||
for optname in options:
|
for opttype, optname, optvalue in _OptionsTemplateGen(options):
|
||||||
if isinstance(options, (list,tuple)):
|
|
||||||
if len(optname) > 2:
|
|
||||||
opttype, optname, optvalue = optname
|
|
||||||
else:
|
|
||||||
(opttype, optname), optvalue = optname, None
|
|
||||||
else:
|
|
||||||
opttype, optvalue = options[optname]
|
|
||||||
if optname in pOptions:
|
if optname in pOptions:
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
if convert:
|
v = self.get(sec, optname, vars=pOptions)
|
||||||
if opttype == "bool":
|
|
||||||
v = self.getboolean(sec, optname)
|
|
||||||
if v is None: continue
|
|
||||||
elif opttype == "int":
|
|
||||||
v = self.getint(sec, optname)
|
|
||||||
if v is None: continue
|
|
||||||
else:
|
|
||||||
v = self.get(sec, optname, vars=pOptions)
|
|
||||||
else:
|
|
||||||
v = self.get(sec, optname, vars=pOptions)
|
|
||||||
values[optname] = v
|
values[optname] = v
|
||||||
|
if convert:
|
||||||
|
conv = CONVERTER.get(opttype)
|
||||||
|
if conv:
|
||||||
|
if v is None: continue
|
||||||
|
values[optname] = conv(v)
|
||||||
except NoSectionError as e:
|
except NoSectionError as e:
|
||||||
if shouldExist:
|
if shouldExist:
|
||||||
raise
|
raise
|
||||||
|
@ -350,33 +362,20 @@ class DefinitionInitConfigReader(ConfigReader):
|
||||||
if opt == '__name__' or opt in self._opts: continue
|
if opt == '__name__' or opt in self._opts: continue
|
||||||
self._opts[opt] = self.get("Definition", opt)
|
self._opts[opt] = self.get("Definition", opt)
|
||||||
|
|
||||||
def convertOptions(self, opts, pOptions={}):
|
def convertOptions(self, opts, configOpts):
|
||||||
options = self._configOpts
|
"""Convert interpolated combined options to expected type.
|
||||||
for optname in options:
|
"""
|
||||||
if isinstance(options, (list,tuple)):
|
for opttype, optname, optvalue in _OptionsTemplateGen(configOpts):
|
||||||
if len(optname) > 2:
|
conv = CONVERTER.get(opttype)
|
||||||
opttype, optname, optvalue = optname
|
if conv:
|
||||||
else:
|
v = opts.get(optname)
|
||||||
(opttype, optname), optvalue = optname, None
|
if v is None: continue
|
||||||
else:
|
try:
|
||||||
opttype, optvalue = options[optname]
|
opts[optname] = conv(v)
|
||||||
if optname in pOptions:
|
except ValueError:
|
||||||
continue
|
logSys.warning("Wrong %s value %r for %r. Using default one: %r",
|
||||||
try:
|
opttype, v, optname, optvalue)
|
||||||
if opttype == "bool":
|
opts[optname] = optvalue
|
||||||
v = opts.get(optname)
|
|
||||||
if v is None or isinstance(v, bool): continue
|
|
||||||
v = _as_bool(v)
|
|
||||||
opts[optname] = v
|
|
||||||
elif opttype == "int":
|
|
||||||
v = opts.get(optname)
|
|
||||||
if v is None or isinstance(v, (int, long)): continue
|
|
||||||
v = int(v)
|
|
||||||
opts[optname] = v
|
|
||||||
except ValueError:
|
|
||||||
logSys.warning("Wrong %s value %r for %r. Using default one: %r",
|
|
||||||
opttype, v, optname, optvalue)
|
|
||||||
opts[optname] = optvalue
|
|
||||||
|
|
||||||
def getCombOption(self, optname):
|
def getCombOption(self, optname):
|
||||||
"""Get combined definition option (as string) using pre-set and init
|
"""Get combined definition option (as string) using pre-set and init
|
||||||
|
@ -413,7 +412,7 @@ class DefinitionInitConfigReader(ConfigReader):
|
||||||
if not opts:
|
if not opts:
|
||||||
raise ValueError('recursive tag definitions unable to be resolved')
|
raise ValueError('recursive tag definitions unable to be resolved')
|
||||||
# convert options after all interpolations:
|
# convert options after all interpolations:
|
||||||
self.convertOptions(opts)
|
self.convertOptions(opts, self._configOpts)
|
||||||
return opts
|
return opts
|
||||||
|
|
||||||
def convert(self):
|
def convert(self):
|
||||||
|
|
|
@ -87,6 +87,21 @@ option = %s
|
||||||
self.assertTrue(self.c.read(f)) # we got some now
|
self.assertTrue(self.c.read(f)) # we got some now
|
||||||
return self.c.getOptions('section', [("int", 'option')])['option']
|
return self.c.getOptions('section', [("int", 'option')])['option']
|
||||||
|
|
||||||
|
def testConvert(self):
|
||||||
|
self.c.add_section("Definition")
|
||||||
|
self.c.set("Definition", "a", "1")
|
||||||
|
self.c.set("Definition", "b", "1")
|
||||||
|
self.c.set("Definition", "c", "test")
|
||||||
|
opts = self.c.getOptions("Definition",
|
||||||
|
(('int', 'a', 0), ('bool', 'b', 0), ('int', 'c', 0)))
|
||||||
|
self.assertSortedEqual(opts, {'a': 1, 'b': True, 'c': 0})
|
||||||
|
opts = self.c.getOptions("Definition",
|
||||||
|
(('int', 'a'), ('bool', 'b'), ('int', 'c')))
|
||||||
|
self.assertSortedEqual(opts, {'a': 1, 'b': True, 'c': None})
|
||||||
|
opts = self.c.getOptions("Definition",
|
||||||
|
{'a': ('int', 0), 'b': ('bool', 0), 'c': ('int', 0)})
|
||||||
|
self.assertSortedEqual(opts, {'a': 1, 'b': True, 'c': 0})
|
||||||
|
|
||||||
def testInaccessibleFile(self):
|
def testInaccessibleFile(self):
|
||||||
f = os.path.join(self.d, "d.conf") # inaccessible file
|
f = os.path.join(self.d, "d.conf") # inaccessible file
|
||||||
self._write('d.conf', 0)
|
self._write('d.conf', 0)
|
||||||
|
@ -483,11 +498,7 @@ class JailReaderTest(LogCaptureTestCase):
|
||||||
self.assertRaises(NoSectionError, c.getOptions, 'test', {})
|
self.assertRaises(NoSectionError, c.getOptions, 'test', {})
|
||||||
|
|
||||||
|
|
||||||
class FilterReaderTest(unittest.TestCase):
|
class FilterReaderTest(LogCaptureTestCase):
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super(FilterReaderTest, self).__init__(*args, **kwargs)
|
|
||||||
self.__share_cfg = {}
|
|
||||||
|
|
||||||
def testConvert(self):
|
def testConvert(self):
|
||||||
output = [
|
output = [
|
||||||
|
@ -533,6 +544,15 @@ class FilterReaderTest(unittest.TestCase):
|
||||||
output[0][-1] = 5; # maxlines = 5
|
output[0][-1] = 5; # maxlines = 5
|
||||||
self.assertSortedEqual(filterReader.convert(), output)
|
self.assertSortedEqual(filterReader.convert(), output)
|
||||||
|
|
||||||
|
def testConvertOptions(self):
|
||||||
|
filterReader = FilterReader("testcase01", "testcase01", {'maxlines': '<test>', 'test': 'X'},
|
||||||
|
share_config=TEST_FILES_DIR_SHARE_CFG, basedir=TEST_FILES_DIR)
|
||||||
|
filterReader.read()
|
||||||
|
filterReader.getOptions(None)
|
||||||
|
opts = filterReader.getCombined();
|
||||||
|
self.assertNotEqual(opts['maxlines'], 'X'); # wrong int value 'X' for 'maxlines'
|
||||||
|
self.assertLogged("Wrong int value 'X' for 'maxlines'. Using default one:")
|
||||||
|
|
||||||
def testFilterReaderSubstitionDefault(self):
|
def testFilterReaderSubstitionDefault(self):
|
||||||
output = [['set', 'jailname', 'addfailregex', 'to=sweet@example.com fromip=<IP>']]
|
output = [['set', 'jailname', 'addfailregex', 'to=sweet@example.com fromip=<IP>']]
|
||||||
filterReader = FilterReader('substition', "jailname", {},
|
filterReader = FilterReader('substition', "jailname", {},
|
||||||
|
|
Loading…
Reference in New Issue