allow to use filter options by fail2ban-regex, example:

fail2ban-regex text.log "sshd[mode=aggressive]"
pull/1710/head
sebres 2017-03-11 00:06:29 +01:00
parent 8af7a73bfc
commit 6a26602ba8
3 changed files with 64 additions and 20 deletions

View File

@ -120,6 +120,8 @@ Report bugs to https://github.com/fail2ban/fail2ban/issues
version="%prog " + version)
p.add_options([
Option("-c", "--config", default='/etc/fail2ban',
help="set alternate config directory"),
Option("-d", "--datepattern",
help="set custom pattern used to match date/times"),
Option("-e", "--encoding", default=PREFER_ENC,
@ -271,24 +273,55 @@ class Fail2banRegex(object):
def readRegex(self, value, regextype):
assert(regextype in ('fail', 'ignore'))
regex = regextype + 'regex'
if regextype == 'fail' and (os.path.isfile(value) or os.path.isfile(value + '.conf')):
if os.path.basename(os.path.dirname(value)) == 'filter.d':
# try to check - we've case filter?[options...]?:
basedir = self._opts.config
fltFile = None
fltOpt = {}
if regextype == 'fail':
fltName, fltOpt = JailReader.extractOptions(value)
if fltName is not None:
if "." in fltName[~5:]:
tryNames = (fltName,)
else:
tryNames = (fltName, fltName + '.conf', fltName + '.local')
for fltFile in tryNames:
if not "/" in fltFile:
if os.path.basename(basedir) == 'filter.d':
fltFile = os.path.join(basedir, fltFile)
else:
fltFile = os.path.join(basedir, 'filter.d', fltFile)
else:
basedir = os.path.dirname(fltFile)
if os.path.isfile(fltFile):
break
fltFile = None
# if it is filter file:
if fltFile is not None:
if (basedir == self._opts.config
or os.path.basename(basedir) == 'filter.d'
or ("." not in fltName[~5:] and "/" not in fltName)
):
## within filter.d folder - use standard loading algorithm to load filter completely (with .local etc.):
basedir = os.path.dirname(os.path.dirname(value))
value = os.path.splitext(os.path.basename(value))[0]
output( "Use %11s filter file : %s, basedir: %s" % (regex, value, basedir) )
reader = FilterReader(value, 'fail2ban-regex-jail', {}, share_config=self.share_config, basedir=basedir)
if not reader.read(): # pragma: no cover
output( "ERROR: failed to load filter %s" % value )
return False
else: # pragma: no cover
if os.path.basename(basedir) == 'filter.d':
basedir = os.path.dirname(basedir)
fltName = os.path.splitext(os.path.basename(fltName))[0]
output( "Use %11s filter file : %s, basedir: %s" % (regex, fltName, basedir) )
else:
## foreign file - readexplicit this file and includes if possible:
output( "Use %11s file : %s" % (regex, fltName) )
basedir = None
if fltOpt:
output( "Use filter options : %r" % fltOpt )
reader = FilterReader(fltName, 'fail2ban-regex-jail', fltOpt, share_config=self.share_config, basedir=basedir)
if basedir is not None: # pragma: no cover
ret = reader.read()
else:
## foreign file - readexplicit this file and includes if possible:
output( "Use %11s file : %s" % (regex, value) )
reader = FilterReader(value, 'fail2ban-regex-jail', {}, share_config=self.share_config)
reader.setBaseDir(None)
if not reader.readexplicit():
output( "ERROR: failed to read %s" % value )
return False
ret = reader.readexplicit()
if not ret:
output( "ERROR: failed to load filter %s" % value )
return False
reader.getOptions(None)
readercommands = reader.convert()

View File

@ -43,7 +43,7 @@ logSys = getLogger(__name__)
class JailReader(ConfigReader):
# regex, to extract list of options:
optionCRE = re.compile(r"^([\w\-_\.]+)(?:\[(.*)\])?\s*$", re.DOTALL)
optionCRE = re.compile(r"^([^\[]+)(?:\[(.*)\])?\s*$", re.DOTALL)
# regex, to iterate over single option in option list, syntax:
# `action = act[p1="...", p2='...', p3=...]`, where the p3=... not contains `,` or ']'
# since v0.10 separator extended with `]\s*[` for support of multiple option groups, syntax

View File

@ -209,7 +209,8 @@ class Fail2banRegexTest(LogCaptureTestCase):
(opts, args, fail2banRegex) = _Fail2banRegex(
"-l", "notice", # put down log-level, because of too many debug-messages
"-v", "--verbose-date", "--print-all-matched",
Fail2banRegexTest.FILENAME_SSHD, Fail2banRegexTest.FILTER_SSHD
"-c", CONFIG_DIR,
Fail2banRegexTest.FILENAME_SSHD, "sshd"
)
self.assertTrue(fail2banRegex.start(args))
# test failure line and not-failure lines both presents:
@ -220,7 +221,8 @@ class Fail2banRegexTest(LogCaptureTestCase):
(opts, args, fail2banRegex) = _Fail2banRegex(
"-l", "notice", # put down log-level, because of too many debug-messages
"--print-all-matched",
Fail2banRegexTest.FILENAME_ZZZ_SSHD, Fail2banRegexTest.FILTER_SSHD
"-c", CONFIG_DIR,
Fail2banRegexTest.FILENAME_ZZZ_SSHD, "sshd.conf[mode=normal]"
)
self.assertTrue(fail2banRegex.start(args))
# test failure line and all not-failure lines presents:
@ -234,7 +236,8 @@ class Fail2banRegexTest(LogCaptureTestCase):
(opts, args, fail2banRegex) = _Fail2banRegex(
"-l", "notice", # put down log-level, because of too many debug-messages
"--print-all-matched", "--print-all-missed",
Fail2banRegexTest.FILENAME_ZZZ_SSHD, Fail2banRegexTest.FILTER_ZZZ_SSHD
"-c", os.path.dirname(Fail2banRegexTest.FILTER_ZZZ_SSHD),
Fail2banRegexTest.FILENAME_ZZZ_SSHD, os.path.basename(Fail2banRegexTest.FILTER_ZZZ_SSHD)
)
self.assertTrue(fail2banRegex.start(args))
# test "failure" line presents (2nd part only, because multiline fewer precise):
@ -245,10 +248,18 @@ class Fail2banRegexTest(LogCaptureTestCase):
# by the way test of ignoreregex (specified in filter file)...
(opts, args, fail2banRegex) = _Fail2banRegex(
"-l", "notice", # put down log-level, because of too many debug-messages
Fail2banRegexTest.FILENAME_ZZZ_GEN, Fail2banRegexTest.FILTER_ZZZ_GEN
Fail2banRegexTest.FILENAME_ZZZ_GEN, Fail2banRegexTest.FILTER_ZZZ_GEN+"[mode=test]"
)
self.assertTrue(fail2banRegex.start(args))
def testWrongFilterFile(self):
# use test log as filter file to cover eror cases...
(opts, args, fail2banRegex) = _Fail2banRegex(
"-l", "notice", # put down log-level, because of too many debug-messages
Fail2banRegexTest.FILENAME_ZZZ_GEN, Fail2banRegexTest.FILENAME_ZZZ_GEN
)
self.assertFalse(fail2banRegex.start(args))
def _reset(self):
# reset global warn-counter:
from ..server.filter import _decode_line_warn