move extractOptions from JailReader to helpers (common usage server- / client-side);

pull/1989/head
sebres 2017-12-05 17:49:22 +01:00
parent ff987b60cd
commit de97dedba0
6 changed files with 55 additions and 55 deletions

View File

@ -46,12 +46,12 @@ except ImportError:
FilterSystemd = None
from ..version import version
from .jailreader import JailReader
from .filterreader import FilterReader
from ..server.filter import Filter, FileContainer
from ..server.failregex import Regex, RegexException
from ..helpers import str2LogLevel, getVerbosityFormat, FormatterWithTraceBack, getLogger, PREFER_ENC
from ..helpers import str2LogLevel, getVerbosityFormat, FormatterWithTraceBack, getLogger, \
extractOptions, PREFER_ENC
# Gets the instance of the logger.
logSys = getLogger("fail2ban")
@ -287,7 +287,7 @@ class Fail2banRegex(object):
fltFile = None
fltOpt = {}
if regextype == 'fail':
fltName, fltOpt = JailReader.extractOptions(value)
fltName, fltOpt = extractOptions(value)
if fltName is not None:
if "." in fltName[~5:]:
tryNames = (fltName,)
@ -606,7 +606,7 @@ class Fail2banRegex(object):
return False
output( "Use systemd journal" )
output( "Use encoding : %s" % self._encoding )
backend, beArgs = JailReader.extractOptions(cmd_log)
backend, beArgs = extractOptions(cmd_log)
flt = FilterSystemd(None, **beArgs)
flt.setLogEncoding(self._encoding)
myjournal = flt.getJournalReader()

View File

@ -33,8 +33,7 @@ from .configreader import ConfigReaderUnshared, ConfigReader
from .filterreader import FilterReader
from .actionreader import ActionReader
from ..version import version
from ..helpers import getLogger
from ..helpers import splitwords
from ..helpers import getLogger, extractOptions, splitwords
# Gets the instance of the logger.
logSys = getLogger(__name__)
@ -42,15 +41,6 @@ logSys = getLogger(__name__)
class JailReader(ConfigReader):
# regex, to extract list of options:
optionCRE = re.compile(r"^([^\[]+)(?:\[(.*)\])?\s*$", re.DOTALL)
# regex, to iterate over single option in option list, syntax:
# `action = act[p1="...", p2='...', p3=...]`, where the p3=... not contains `,` or ']'
# since v0.10 separator extended with `]\s*[` for support of multiple option groups, syntax
# `action = act[p1=...][p2=...]`
optionExtractRE = re.compile(
r'([\w\-_\.]+)=(?:"([^"]*)"|\'([^\']*)\'|([^,\]]*))(?:,|\]\s*\[|$)', re.DOTALL)
def __init__(self, name, force_enable=False, **kwargs):
ConfigReader.__init__(self, **kwargs)
self.__name = name
@ -134,7 +124,7 @@ class JailReader(ConfigReader):
# Read filter
flt = self.__opts["filter"]
if flt:
filterName, filterOpt = JailReader.extractOptions(flt)
filterName, filterOpt = extractOptions(flt)
if not filterName:
raise JailDefError("Invalid filter definition %r" % flt)
self.__filter = FilterReader(
@ -164,7 +154,7 @@ class JailReader(ConfigReader):
try:
if not act: # skip empty actions
continue
actName, actOpt = JailReader.extractOptions(act)
actName, actOpt = extractOptions(act)
if not actName:
raise JailDefError("Invalid action definition %r" % act)
if actName.endswith(".py"):
@ -268,22 +258,5 @@ class JailReader(ConfigReader):
stream.insert(0, ["add", self.__name, backend])
return stream
@staticmethod
def extractOptions(option):
match = JailReader.optionCRE.match(option)
if not match:
# TODO proper error handling
return None, None
option_name, optstr = match.groups()
option_opts = dict()
if optstr:
for optmatch in JailReader.optionExtractRE.finditer(optstr):
opt = optmatch.group(1)
value = [
val for val in optmatch.group(2,3,4) if val is not None][0]
option_opts[opt.strip()] = value.strip()
return option_name, option_opts
class JailDefError(Exception):
pass

View File

@ -237,6 +237,34 @@ else:
return uni_decode(x, enc, 'replace')
#
# Following function used for parse options from parameter (e.g. `name[p1=0, p2="..."][p3='...']`).
#
# regex, to extract list of options:
OPTION_CRE = re.compile(r"^([^\[]+)(?:\[(.*)\])?\s*$", re.DOTALL)
# regex, to iterate over single option in option list, syntax:
# `action = act[p1="...", p2='...', p3=...]`, where the p3=... not contains `,` or ']'
# since v0.10 separator extended with `]\s*[` for support of multiple option groups, syntax
# `action = act[p1=...][p2=...]`
OPTION_EXTRACT_CRE = re.compile(
r'([\w\-_\.]+)=(?:"([^"]*)"|\'([^\']*)\'|([^,\]]*))(?:,|\]\s*\[|$)', re.DOTALL)
def extractOptions(option):
match = OPTION_CRE.match(option)
if not match:
# TODO proper error handling
return None, None
option_name, optstr = match.groups()
option_opts = dict()
if optstr:
for optmatch in OPTION_EXTRACT_CRE.finditer(optstr):
opt = optmatch.group(1)
value = [
val for val in optmatch.group(2,3,4) if val is not None][0]
option_opts[opt.strip()] = value.strip()
return option_name, option_opts
#
# Following facilities used for safe recursive interpolation of
# tags (<tag>) in tagged options.

View File

@ -27,8 +27,7 @@ import logging
import Queue
from .actions import Actions
from ..client.jailreader import JailReader
from ..helpers import getLogger, MyTime
from ..helpers import getLogger, extractOptions, MyTime
# Gets the instance of the logger.
logSys = getLogger(__name__)
@ -85,7 +84,7 @@ class Jail(object):
return "%s(%r)" % (self.__class__.__name__, self.name)
def _setBackend(self, backend):
backend, beArgs = JailReader.extractOptions(backend)
backend, beArgs = extractOptions(backend)
backend = backend.lower() # to assure consistent matching
backends = self._BACKENDS

View File

@ -30,7 +30,7 @@ import tempfile
import unittest
from ..client.configreader import ConfigReader, ConfigReaderUnshared, NoSectionError
from ..client import configparserinc
from ..client.jailreader import JailReader
from ..client.jailreader import JailReader, extractOptions
from ..client.filterreader import FilterReader
from ..client.jailsreader import JailsReader
from ..client.actionreader import ActionReader, CommandAction
@ -260,25 +260,25 @@ class JailReaderTest(LogCaptureTestCase):
# Simple example
option = "mail-whois[name=SSH]"
expected = ('mail-whois', {'name': 'SSH'})
result = JailReader.extractOptions(option)
result = extractOptions(option)
self.assertEqual(expected, result)
self.assertEqual(('mail.who_is', {}), JailReader.extractOptions("mail.who_is"))
self.assertEqual(('mail.who_is', {'a':'cat', 'b':'dog'}), JailReader.extractOptions("mail.who_is[a=cat,b=dog]"))
self.assertEqual(('mail--ho_is', {}), JailReader.extractOptions("mail--ho_is"))
self.assertEqual(('mail.who_is', {}), extractOptions("mail.who_is"))
self.assertEqual(('mail.who_is', {'a':'cat', 'b':'dog'}), extractOptions("mail.who_is[a=cat,b=dog]"))
self.assertEqual(('mail--ho_is', {}), extractOptions("mail--ho_is"))
self.assertEqual(('mail--ho_is', {}), JailReader.extractOptions("mail--ho_is['s']"))
self.assertEqual(('mail--ho_is', {}), extractOptions("mail--ho_is['s']"))
#self.printLog()
#self.assertLogged("Invalid argument ['s'] in ''s''")
self.assertEqual(('mail', {'a': ','}), JailReader.extractOptions("mail[a=',']"))
self.assertEqual(('mail', {'a': ','}), extractOptions("mail[a=',']"))
#self.assertRaises(ValueError, JailReader.extractOptions ,'mail-how[')
#self.assertRaises(ValueError, extractOptions ,'mail-how[')
# Empty option
option = "abc[]"
expected = ('abc', {})
result = JailReader.extractOptions(option)
result = extractOptions(option)
self.assertEqual(expected, result)
# More complex examples
@ -296,11 +296,11 @@ class JailReaderTest(LogCaptureTestCase):
'opt10': "",
'opt11': "",
})
result = JailReader.extractOptions(option)
result = extractOptions(option)
self.assertEqual(expected, result)
# And multiple groups (`][` instead of `,`)
result = JailReader.extractOptions(option.replace(',', ']['))
result = extractOptions(option.replace(',', ']['))
expected2 = (expected[0],
dict((k, v.replace(',', '][')) for k, v in expected[1].iteritems())
)
@ -439,7 +439,7 @@ class FilterReaderTest(unittest.TestCase):
def testFilterReaderSubstitionKnown(self):
output = [['set', 'jailname', 'addfailregex', 'to=test,sweet@example.com,test2,sweet@example.com fromip=<IP>']]
filterName, filterOpt = JailReader.extractOptions(
filterName, filterOpt = extractOptions(
'substition[honeypot="<sweet>,<known/honeypot>", sweet="test,<known/honeypot>,test2"]')
filterReader = FilterReader('substition', "jailname", filterOpt,
share_config=TEST_FILES_DIR_SHARE_CFG, basedir=TEST_FILES_DIR)
@ -650,7 +650,7 @@ class JailsReaderTest(LogCaptureTestCase):
if jail == 'INCLUDES':
continue
filterName = jails.get(jail, 'filter')
filterName, filterOpt = JailReader.extractOptions(filterName)
filterName, filterOpt = extractOptions(filterName)
allFilters.add(filterName)
self.assertTrue(len(filterName))
# moreover we must have a file for it
@ -669,7 +669,7 @@ class JailsReaderTest(LogCaptureTestCase):
# somewhat duplicating here what is done in JailsReader if
# the jail is enabled
for act in actions.split('\n'):
actName, actOpt = JailReader.extractOptions(act)
actName, actOpt = extractOptions(act)
self.assertTrue(len(actName))
self.assertTrue(isinstance(actOpt, dict))
if actName == 'iptables-multiport':
@ -696,7 +696,7 @@ class JailsReaderTest(LogCaptureTestCase):
if not (a.endswith('common.conf') or a.endswith('-aggressive.conf')))
# get filters of all jails (filter names without options inside filter[...])
filters_jail = set(
JailReader.extractOptions(jail.options['filter'])[0] for jail in jails.jails
extractOptions(jail.options['filter'])[0] for jail in jails.jails
)
self.maxDiff = None
self.assertTrue(filters.issubset(filters_jail),

View File

@ -42,7 +42,7 @@ from ..server.ticket import BanTicket
from ..server.utils import Utils
from .dummyjail import DummyJail
from .utils import LogCaptureTestCase
from ..helpers import getLogger, PREFER_ENC
from ..helpers import getLogger, extractOptions, PREFER_ENC
from .. import version
try:
@ -1034,7 +1034,7 @@ class LoggingTests(LogCaptureTestCase):
os.remove(f)
from clientreadertestcase import ActionReader, JailReader, JailsReader, CONFIG_DIR, STOCK
from clientreadertestcase import ActionReader, JailsReader, CONFIG_DIR, STOCK
class ServerConfigReaderTests(LogCaptureTestCase):
@ -1145,7 +1145,7 @@ class ServerConfigReaderTests(LogCaptureTestCase):
def getDefaultJailStream(self, jail, act):
act = act.replace('%(__name__)s', jail)
actName, actOpt = JailReader.extractOptions(act)
actName, actOpt = extractOptions(act)
stream = [
['add', jail, 'polling'],
# ['set', jail, 'addfailregex', 'DUMMY-REGEX <HOST>'],