fail2ban-regex - several enhancements and fixes:

- improved usage output (don't put a long help if an error occurs);
- new option `--no-check-all` to avoid check of all regex's (first matched only);
- new option `-o`, `--out` to set token provided in output (disables check-all and outputs only expected data);
- test cases optimized and extended
pull/2601/head
sebres 2020-01-09 16:59:13 +01:00
parent dbc6590589
commit d1b7e2b5fb
3 changed files with 195 additions and 165 deletions

View File

@ -152,6 +152,10 @@ filter = flt[logtype=short]
* partially implements gh-980 (more breakdown safe handling); * partially implements gh-980 (more breakdown safe handling);
* closes gh-1680 (better as large-scale banning implementation with on-demand reban by failure, * closes gh-1680 (better as large-scale banning implementation with on-demand reban by failure,
at least unless a bulk-ban gets implemented); at least unless a bulk-ban gets implemented);
* fail2ban-regex - several enhancements and fixes:
- improved usage output (don't put a long help if an error occurs);
- new option `--no-check-all` to avoid check of all regex's (first matched only);
- new option `-o`, `--out` to set token only provided in output (disables check-all and outputs only expected data).
ver. 0.10.4 (2018/10/04) - ten-four-on-due-date-ten-four ver. 0.10.4 (2018/10/04) - ten-four-on-due-date-ten-four

View File

@ -25,7 +25,13 @@ This tools can test regular expressions for "fail2ban".
""" """
__author__ = "Fail2Ban Developers" __author__ = "Fail2Ban Developers"
__copyright__ = "Copyright (c) 2004-2008 Cyril Jaquier, 2012-2014 Yaroslav Halchenko" __copyright__ = """Copyright (c) 2004-2008 Cyril Jaquier, 2008- Fail2Ban Contributors
Copyright of modifications held by their respective authors.
Licensed under the GNU General Public License v2 (GPL).
Written by Cyril Jaquier <cyril.jaquier@fail2ban.org>.
Many contributions by Yaroslav O. Halchenko, Steven Hiscocks, Sergey G. Brester (sebres)."""
__license__ = "GPL" __license__ = "GPL"
import getopt import getopt
@ -97,11 +103,12 @@ def dumpNormVersion(*args):
output(normVersion()) output(normVersion())
sys.exit(0) sys.exit(0)
def get_opt_parser(): usage = lambda: "%s [OPTIONS] <LOG> <REGEX> [IGNOREREGEX]" % sys.argv[0]
# use module docstring for help output
p = OptionParser( class _f2bOptParser(OptionParser):
usage="%s [OPTIONS] <LOG> <REGEX> [IGNOREREGEX]\n" % sys.argv[0] + __doc__ def format_help(self, *args, **kwargs):
+ """ """ Overwritten format helper with full ussage."""
return usage() + __doc__ + """
LOG: LOG:
string a string representing a log line string a string representing a log line
filename path to a log file (/var/log/auth.log) filename path to a log file (/var/log/auth.log)
@ -114,16 +121,15 @@ REGEX:
IGNOREREGEX: IGNOREREGEX:
string a string representing an 'ignoreregex' string a string representing an 'ignoreregex'
filename path to a filter file (filter.d/sshd.conf) filename path to a filter file (filter.d/sshd.conf)
""" + OptionParser.format_help(self, *args, **kwargs) + """\n
Report bugs to https://github.com/fail2ban/fail2ban/issues\n
""" + __copyright__ + "\n"
Copyright (c) 2004-2008 Cyril Jaquier, 2008- Fail2Ban Contributors
Copyright of modifications held by their respective authors.
Licensed under the GNU General Public License v2 (GPL).
Written by Cyril Jaquier <cyril.jaquier@fail2ban.org>. def get_opt_parser():
Many contributions by Yaroslav O. Halchenko and Steven Hiscocks. # use module docstring for help output
p = _f2bOptParser(
Report bugs to https://github.com/fail2ban/fail2ban/issues usage=usage(),
""",
version="%prog " + version) version="%prog " + version)
p.add_options([ p.add_options([
@ -160,6 +166,10 @@ Report bugs to https://github.com/fail2ban/fail2ban/issues
help="Verbose date patterns/regex in output"), help="Verbose date patterns/regex in output"),
Option("-D", "--debuggex", action='store_true', Option("-D", "--debuggex", action='store_true',
help="Produce debuggex.com urls for debugging there"), help="Produce debuggex.com urls for debugging there"),
Option("--no-check-all", action="store_false", dest="checkAllRegex", default=True,
help="Disable check for all regex's"),
Option("-o", "--out", action="store", dest="out", default=None,
help="Set token to print failure information only (row, id, ip, msg, host, ip4, ip6, dns, matches, ...)"),
Option("--print-no-missed", action='store_true', Option("--print-no-missed", action='store_true',
help="Do not print any missed lines"), help="Do not print any missed lines"),
Option("--print-no-ignored", action='store_true', Option("--print-no-ignored", action='store_true',
@ -234,6 +244,7 @@ class Fail2banRegex(object):
def __init__(self, opts): def __init__(self, opts):
# set local protected members from given options: # set local protected members from given options:
self.__dict__.update(dict(('_'+o,v) for o,v in opts.__dict__.iteritems())) self.__dict__.update(dict(('_'+o,v) for o,v in opts.__dict__.iteritems()))
self._opts = opts
self._maxlines_set = False # so we allow to override maxlines in cmdline self._maxlines_set = False # so we allow to override maxlines in cmdline
self._datepattern_set = False self._datepattern_set = False
self._journalmatch = None self._journalmatch = None
@ -259,10 +270,12 @@ class Fail2banRegex(object):
self._filter.setUseDns(opts.usedns) self._filter.setUseDns(opts.usedns)
self._filter.returnRawHost = opts.raw self._filter.returnRawHost = opts.raw
self._filter.checkFindTime = False self._filter.checkFindTime = False
self._filter.checkAllRegex = True self._filter.checkAllRegex = opts.checkAllRegex and not opts.out
self._opts = opts
self._backend = 'auto' self._backend = 'auto'
def output(self, line):
if not self._opts.out: output(line)
def decode_line(self, line): def decode_line(self, line):
return FileContainer.decode_line('<LOG>', self._encoding, line) return FileContainer.decode_line('<LOG>', self._encoding, line)
@ -274,14 +287,14 @@ class Fail2banRegex(object):
self._filter.setDatePattern(pattern) self._filter.setDatePattern(pattern)
self._datepattern_set = True self._datepattern_set = True
if pattern is not None: if pattern is not None:
output( "Use datepattern : %s" % ( self.output( "Use datepattern : %s" % (
self._filter.getDatePattern()[1], ) ) self._filter.getDatePattern()[1], ) )
def setMaxLines(self, v): def setMaxLines(self, v):
if not self._maxlines_set: if not self._maxlines_set:
self._filter.setMaxLines(int(v)) self._filter.setMaxLines(int(v))
self._maxlines_set = True self._maxlines_set = True
output( "Use maxlines : %d" % self._filter.getMaxLines() ) self.output( "Use maxlines : %d" % self._filter.getMaxLines() )
def setJournalMatch(self, v): def setJournalMatch(self, v):
self._journalmatch = v self._journalmatch = v
@ -297,7 +310,7 @@ class Fail2banRegex(object):
realopts[k] = combopts[k] if k in combopts else reader.get('Definition', k) realopts[k] = combopts[k] if k in combopts else reader.get('Definition', k)
except NoOptionError: # pragma: no cover except NoOptionError: # pragma: no cover
pass pass
output("Real filter options : %r" % realopts) self.output("Real filter options : %r" % realopts)
def readRegex(self, value, regextype): def readRegex(self, value, regextype):
assert(regextype in ('fail', 'ignore')) assert(regextype in ('fail', 'ignore'))
@ -334,15 +347,15 @@ class Fail2banRegex(object):
if os.path.basename(basedir) == 'filter.d': if os.path.basename(basedir) == 'filter.d':
basedir = os.path.dirname(basedir) basedir = os.path.dirname(basedir)
fltName = os.path.splitext(os.path.basename(fltName))[0] fltName = os.path.splitext(os.path.basename(fltName))[0]
output( "Use %11s filter file : %s, basedir: %s" % (regex, fltName, basedir) ) self.output( "Use %11s filter file : %s, basedir: %s" % (regex, fltName, basedir) )
else: else:
## foreign file - readexplicit this file and includes if possible: ## foreign file - readexplicit this file and includes if possible:
output( "Use %11s file : %s" % (regex, fltName) ) self.output( "Use %11s file : %s" % (regex, fltName) )
basedir = None basedir = None
if not os.path.isabs(fltName): # avoid join with "filter.d" inside FilterReader if not os.path.isabs(fltName): # avoid join with "filter.d" inside FilterReader
fltName = os.path.abspath(fltName) fltName = os.path.abspath(fltName)
if fltOpt: if fltOpt:
output( "Use filter options : %r" % fltOpt ) self.output( "Use filter options : %r" % fltOpt )
reader = FilterReader(fltName, 'fail2ban-regex-jail', fltOpt, share_config=self.share_config, basedir=basedir) reader = FilterReader(fltName, 'fail2ban-regex-jail', fltOpt, share_config=self.share_config, basedir=basedir)
ret = None ret = None
try: try:
@ -410,7 +423,7 @@ class Fail2banRegex(object):
return False return False
else: else:
output( "Use %11s line : %s" % (regex, shortstr(value)) ) self.output( "Use %11s line : %s" % (regex, shortstr(value)) )
regex_values = {regextype: [RegexStat(value)]} regex_values = {regextype: [RegexStat(value)]}
for regextype, regex_values in regex_values.iteritems(): for regextype, regex_values in regex_values.iteritems():
@ -506,6 +519,20 @@ class Fail2banRegex(object):
if len(ret) > 0: if len(ret) > 0:
assert(not is_ignored) assert(not is_ignored)
if self._opts.out:
if self._opts.out in ('id', 'ip'):
for ret in ret:
output(ret[1])
elif self._opts.out == 'msg':
for ret in ret:
output('\n'.join(map(lambda v:''.join(v for v in v), ret[3].get('matches'))))
elif self._opts.out == 'row':
for ret in ret:
output('[%r,\t%r,\t%r],' % (ret[1],ret[2],dict((k,v) for k, v in ret[3].iteritems() if k != 'matches')))
else:
for ret in ret:
output(ret[3].get(self._opts.out))
continue
self._line_stats.matched += 1 self._line_stats.matched += 1
if self._print_all_matched: if self._print_all_matched:
self._line_stats.matched_lines.append(line) self._line_stats.matched_lines.append(line)
@ -554,6 +581,7 @@ class Fail2banRegex(object):
"to print all %d lines" % (header, ltype, lines) ) "to print all %d lines" % (header, ltype, lines) )
def printStats(self): def printStats(self):
if self._opts.out: return True
output( "" ) output( "" )
output( "Results" ) output( "Results" )
output( "=======" ) output( "=======" )
@ -636,8 +664,8 @@ class Fail2banRegex(object):
if os.path.isfile(cmd_log): if os.path.isfile(cmd_log):
try: try:
hdlr = open(cmd_log, 'rb') hdlr = open(cmd_log, 'rb')
output( "Use log file : %s" % cmd_log ) self.output( "Use log file : %s" % cmd_log )
output( "Use encoding : %s" % self._encoding ) self.output( "Use encoding : %s" % self._encoding )
test_lines = self.file_lines_gen(hdlr) test_lines = self.file_lines_gen(hdlr)
except IOError as e: # pragma: no cover except IOError as e: # pragma: no cover
output( e ) output( e )
@ -646,8 +674,8 @@ class Fail2banRegex(object):
if not FilterSystemd: if not FilterSystemd:
output( "Error: systemd library not found. Exiting..." ) output( "Error: systemd library not found. Exiting..." )
return False return False
output( "Use systemd journal" ) self.output( "Use systemd journal" )
output( "Use encoding : %s" % self._encoding ) self.output( "Use encoding : %s" % self._encoding )
backend, beArgs = extractOptions(cmd_log) backend, beArgs = extractOptions(cmd_log)
flt = FilterSystemd(None, **beArgs) flt = FilterSystemd(None, **beArgs)
flt.setLogEncoding(self._encoding) flt.setLogEncoding(self._encoding)
@ -656,23 +684,23 @@ class Fail2banRegex(object):
self.setDatePattern(None) self.setDatePattern(None)
if journalmatch: if journalmatch:
flt.addJournalMatch(journalmatch) flt.addJournalMatch(journalmatch)
output( "Use journal match : %s" % " ".join(journalmatch) ) self.output( "Use journal match : %s" % " ".join(journalmatch) )
test_lines = journal_lines_gen(flt, myjournal) test_lines = journal_lines_gen(flt, myjournal)
else: else:
# if single line parsing (without buffering) # if single line parsing (without buffering)
if self._filter.getMaxLines() <= 1: if self._filter.getMaxLines() <= 1:
output( "Use single line : %s" % shortstr(cmd_log.replace("\n", r"\n")) ) self.output( "Use single line : %s" % shortstr(cmd_log.replace("\n", r"\n")) )
test_lines = [ cmd_log ] test_lines = [ cmd_log ]
else: # multi line parsing (with buffering) else: # multi line parsing (with buffering)
test_lines = cmd_log.split("\n") test_lines = cmd_log.split("\n")
output( "Use multi line : %s line(s)" % len(test_lines) ) self.output( "Use multi line : %s line(s)" % len(test_lines) )
for i, l in enumerate(test_lines): for i, l in enumerate(test_lines):
if i >= 5: if i >= 5:
output( "| ..." ); break self.output( "| ..." ); break
output( "| %2.2s: %s" % (i+1, shortstr(l)) ) self.output( "| %2.2s: %s" % (i+1, shortstr(l)) )
output( "`-" ) self.output( "`-" )
output( "" ) self.output( "" )
self.process(test_lines) self.process(test_lines)
@ -695,14 +723,15 @@ def exec_command_line(*args):
if not len(args) in (2, 3): if not len(args) in (2, 3):
errors.append("ERROR: provide both <LOG> and <REGEX>.") errors.append("ERROR: provide both <LOG> and <REGEX>.")
if errors: if errors:
sys.stderr.write("\n".join(errors) + "\n\n")
parser.print_help() parser.print_help()
sys.stderr.write("\n" + "\n".join(errors) + "\n")
sys.exit(255) sys.exit(255)
output( "" ) if not opts.out:
output( "Running tests" ) output( "" )
output( "=============" ) output( "Running tests" )
output( "" ) output( "=============" )
output( "" )
# Log level (default critical): # Log level (default critical):
opts.log_level = str2LogLevel(opts.log_level) opts.log_level = str2LogLevel(opts.log_level)

View File

@ -52,6 +52,10 @@ def _Fail2banRegex(*args):
logSys.setLevel(str2LogLevel(opts.log_level)) logSys.setLevel(str2LogLevel(opts.log_level))
return (opts, args, Fail2banRegex(opts)) return (opts, args, Fail2banRegex(opts))
def _test_exec(*args):
(opts, args, fail2banRegex) = _Fail2banRegex(*args)
return fail2banRegex.start(args)
class ExitException(Exception): class ExitException(Exception):
def __init__(self, code): def __init__(self, code):
self.code = code self.code = code
@ -76,23 +80,27 @@ def _test_exec_command_line(*args):
sys.stderr = _org['stderr'] sys.stderr = _org['stderr']
return _exit_code return _exit_code
STR_00 = "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0"
RE_00 = r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>"
RE_00_ID = r"Authentication failure for <F-ID>.*?</F-ID> from <HOST>$"
RE_00_USER = r"Authentication failure for <F-USER>.*?</F-USER> from <HOST>$"
FILENAME_01 = os.path.join(TEST_FILES_DIR, "testcase01.log")
FILENAME_02 = os.path.join(TEST_FILES_DIR, "testcase02.log")
FILENAME_WRONGCHAR = os.path.join(TEST_FILES_DIR, "testcase-wrong-char.log")
FILENAME_SSHD = os.path.join(TEST_FILES_DIR, "logs", "sshd")
FILTER_SSHD = os.path.join(CONFIG_DIR, 'filter.d', 'sshd.conf')
FILENAME_ZZZ_SSHD = os.path.join(TEST_FILES_DIR, 'zzz-sshd-obsolete-multiline.log')
FILTER_ZZZ_SSHD = os.path.join(TEST_CONFIG_DIR, 'filter.d', 'zzz-sshd-obsolete-multiline.conf')
FILENAME_ZZZ_GEN = os.path.join(TEST_FILES_DIR, "logs", "zzz-generic-example")
FILTER_ZZZ_GEN = os.path.join(TEST_CONFIG_DIR, 'filter.d', 'zzz-generic-example.conf')
class Fail2banRegexTest(LogCaptureTestCase): class Fail2banRegexTest(LogCaptureTestCase):
RE_00 = r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>"
FILENAME_01 = os.path.join(TEST_FILES_DIR, "testcase01.log")
FILENAME_02 = os.path.join(TEST_FILES_DIR, "testcase02.log")
FILENAME_WRONGCHAR = os.path.join(TEST_FILES_DIR, "testcase-wrong-char.log")
FILENAME_SSHD = os.path.join(TEST_FILES_DIR, "logs", "sshd")
FILTER_SSHD = os.path.join(CONFIG_DIR, 'filter.d', 'sshd.conf')
FILENAME_ZZZ_SSHD = os.path.join(TEST_FILES_DIR, 'zzz-sshd-obsolete-multiline.log')
FILTER_ZZZ_SSHD = os.path.join(TEST_CONFIG_DIR, 'filter.d', 'zzz-sshd-obsolete-multiline.conf')
FILENAME_ZZZ_GEN = os.path.join(TEST_FILES_DIR, "logs", "zzz-generic-example")
FILTER_ZZZ_GEN = os.path.join(TEST_CONFIG_DIR, 'filter.d', 'zzz-generic-example.conf')
def setUp(self): def setUp(self):
"""Call before every test case.""" """Call before every test case."""
LogCaptureTestCase.setUp(self) LogCaptureTestCase.setUp(self)
@ -104,57 +112,50 @@ class Fail2banRegexTest(LogCaptureTestCase):
tearDownMyTime() tearDownMyTime()
def testWrongRE(self): def testWrongRE(self):
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertFalse(_test_exec(
"test", r".** from <HOST>$" "test", r".** from <HOST>$"
) ))
self.assertFalse(fail2banRegex.start(args))
self.assertLogged("Unable to compile regular expression") self.assertLogged("Unable to compile regular expression")
def testWrongIngnoreRE(self): def testWrongIngnoreRE(self):
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertFalse(_test_exec(
"--datepattern", "{^LN-BEG}EPOCH", "--datepattern", "{^LN-BEG}EPOCH",
"test", r".*? from <HOST>$", r".**" "test", r".*? from <HOST>$", r".**"
) ))
self.assertFalse(fail2banRegex.start(args))
self.assertLogged("Unable to compile regular expression") self.assertLogged("Unable to compile regular expression")
def testDirectFound(self): def testDirectFound(self):
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?", "--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched", "--print-no-missed", "--print-all-matched", "--print-no-missed",
"Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0", STR_00,
r"Authentication failure for .*? from <HOST>$" r"Authentication failure for .*? from <HOST>$"
) ))
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed') self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed')
def testDirectNotFound(self): def testDirectNotFound(self):
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"--print-all-missed", "--print-all-missed",
"Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0", STR_00,
r"XYZ from <HOST>$" r"XYZ from <HOST>$"
) ))
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: 1 lines, 0 ignored, 0 matched, 1 missed') self.assertLogged('Lines: 1 lines, 0 ignored, 0 matched, 1 missed')
def testDirectIgnored(self): def testDirectIgnored(self):
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"--print-all-ignored", "--print-all-ignored",
"Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0", STR_00,
r"Authentication failure for .*? from <HOST>$", r"Authentication failure for .*? from <HOST>$",
r"kevin from 192.0.2.0$" r"kevin from 192.0.2.0$"
) ))
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: 1 lines, 1 ignored, 0 matched, 0 missed') self.assertLogged('Lines: 1 lines, 1 ignored, 0 matched, 0 missed')
def testDirectRE_1(self): def testDirectRE_1(self):
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?", "--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched", "--print-all-matched",
Fail2banRegexTest.FILENAME_01, FILENAME_01, RE_00
Fail2banRegexTest.RE_00 ))
)
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: 19 lines, 0 ignored, 13 matched, 6 missed') self.assertLogged('Lines: 19 lines, 0 ignored, 13 matched, 6 missed')
self.assertLogged('Error decoding line'); self.assertLogged('Error decoding line');
@ -164,90 +165,78 @@ class Fail2banRegexTest(LogCaptureTestCase):
self.assertLogged('Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 87.142.124.10') self.assertLogged('Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 87.142.124.10')
def testDirectRE_1raw(self): def testDirectRE_1raw(self):
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?", "--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched", "--raw", "--print-all-matched", "--raw",
Fail2banRegexTest.FILENAME_01, FILENAME_01, RE_00
Fail2banRegexTest.RE_00 ))
)
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: 19 lines, 0 ignored, 16 matched, 3 missed') self.assertLogged('Lines: 19 lines, 0 ignored, 16 matched, 3 missed')
def testDirectRE_1raw_noDns(self): def testDirectRE_1raw_noDns(self):
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?", "--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched", "--raw", "--usedns=no", "--print-all-matched", "--raw", "--usedns=no",
Fail2banRegexTest.FILENAME_01, FILENAME_01, RE_00
Fail2banRegexTest.RE_00 ))
)
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: 19 lines, 0 ignored, 13 matched, 6 missed') self.assertLogged('Lines: 19 lines, 0 ignored, 13 matched, 6 missed')
# usage of <F-ID>\S+</F-ID> causes raw handling automatically: # usage of <F-ID>\S+</F-ID> causes raw handling automatically:
self.pruneLog() self.pruneLog()
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"-d", "^Epoch", "-d", "^Epoch",
"1490349000 test failed.dns.ch", "^\s*test <F-ID>\S+</F-ID>" "1490349000 test failed.dns.ch", "^\s*test <F-ID>\S+</F-ID>"
) ))
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed', all=True) self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed', all=True)
self.assertNotLogged('Unable to find a corresponding IP address') self.assertNotLogged('Unable to find a corresponding IP address')
def testDirectRE_2(self): def testDirectRE_2(self):
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?", "--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched", "--print-all-matched",
Fail2banRegexTest.FILENAME_02, FILENAME_02, RE_00
Fail2banRegexTest.RE_00 ))
)
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: 13 lines, 0 ignored, 5 matched, 8 missed') self.assertLogged('Lines: 13 lines, 0 ignored, 5 matched, 8 missed')
def testVerbose(self): def testVerbose(self):
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?", "--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--timezone", "UTC+0200", "--timezone", "UTC+0200",
"--verbose", "--verbose-date", "--print-no-missed", "--verbose", "--verbose-date", "--print-no-missed",
Fail2banRegexTest.FILENAME_02, FILENAME_02, RE_00
Fail2banRegexTest.RE_00 ))
)
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: 13 lines, 0 ignored, 5 matched, 8 missed') self.assertLogged('Lines: 13 lines, 0 ignored, 5 matched, 8 missed')
self.assertLogged('141.3.81.106 Sun Aug 14 11:53:59 2005') self.assertLogged('141.3.81.106 Sun Aug 14 11:53:59 2005')
self.assertLogged('141.3.81.106 Sun Aug 14 11:54:59 2005') self.assertLogged('141.3.81.106 Sun Aug 14 11:54:59 2005')
def testVerboseFullSshd(self): def testVerboseFullSshd(self):
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"-l", "notice", # put down log-level, because of too many debug-messages "-l", "notice", # put down log-level, because of too many debug-messages
"-v", "--verbose-date", "--print-all-matched", "--print-all-ignored", "-v", "--verbose-date", "--print-all-matched", "--print-all-ignored",
"-c", CONFIG_DIR, "-c", CONFIG_DIR,
Fail2banRegexTest.FILENAME_SSHD, "sshd" FILENAME_SSHD, "sshd"
) ))
self.assertTrue(fail2banRegex.start(args))
# test failure line and not-failure lines both presents: # test failure line and not-failure lines both presents:
self.assertLogged("[29116]: User root not allowed because account is locked", self.assertLogged("[29116]: User root not allowed because account is locked",
"[29116]: Received disconnect from 1.2.3.4", all=True) "[29116]: Received disconnect from 1.2.3.4", all=True)
self.pruneLog() self.pruneLog()
# show real options: # show real options:
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"-l", "notice", # put down log-level, because of too many debug-messages "-l", "notice", # put down log-level, because of too many debug-messages
"-vv", "-c", CONFIG_DIR, "-vv", "-c", CONFIG_DIR,
"Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.1", "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.1",
"sshd[logtype=short]" "sshd[logtype=short]"
) ))
self.assertTrue(fail2banRegex.start(args))
# tet logtype is specified and set in real options: # tet logtype is specified and set in real options:
self.assertLogged("Real filter options :", "'logtype': 'short'", all=True) self.assertLogged("Real filter options :", "'logtype': 'short'", all=True)
self.assertNotLogged("'logtype': 'file'", "'logtype': 'journal'", all=True) self.assertNotLogged("'logtype': 'file'", "'logtype': 'journal'", all=True)
def testFastSshd(self): def testFastSshd(self):
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"-l", "notice", # put down log-level, because of too many debug-messages "-l", "notice", # put down log-level, because of too many debug-messages
"--print-all-matched", "--print-all-matched",
"-c", CONFIG_DIR, "-c", CONFIG_DIR,
Fail2banRegexTest.FILENAME_ZZZ_SSHD, "sshd.conf[mode=normal]" FILENAME_ZZZ_SSHD, "sshd.conf[mode=normal]"
) ))
self.assertTrue(fail2banRegex.start(args))
# test failure line and all not-failure lines presents: # test failure line and all not-failure lines presents:
self.assertLogged( self.assertLogged(
"[29116]: Connection from 192.0.2.4", "[29116]: Connection from 192.0.2.4",
@ -256,93 +245,107 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testMultilineSshd(self): def testMultilineSshd(self):
# by the way test of missing lines by multiline in `for bufLine in orgLineBuffer[int(fullBuffer):]` # by the way test of missing lines by multiline in `for bufLine in orgLineBuffer[int(fullBuffer):]`
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"-l", "notice", # put down log-level, because of too many debug-messages "-l", "notice", # put down log-level, because of too many debug-messages
"--print-all-matched", "--print-all-missed", "--print-all-matched", "--print-all-missed",
"-c", os.path.dirname(Fail2banRegexTest.FILTER_ZZZ_SSHD), "-c", os.path.dirname(FILTER_ZZZ_SSHD),
Fail2banRegexTest.FILENAME_ZZZ_SSHD, os.path.basename(Fail2banRegexTest.FILTER_ZZZ_SSHD) FILENAME_ZZZ_SSHD, os.path.basename(FILTER_ZZZ_SSHD)
) ))
self.assertTrue(fail2banRegex.start(args))
# test "failure" line presents (2nd part only, because multiline fewer precise): # test "failure" line presents (2nd part only, because multiline fewer precise):
self.assertLogged( self.assertLogged(
"[29116]: Received disconnect from 192.0.2.4", all=True) "[29116]: Received disconnect from 192.0.2.4", all=True)
def testFullGeneric(self): def testFullGeneric(self):
# by the way test of ignoreregex (specified in filter file)... # by the way test of ignoreregex (specified in filter file)...
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"-l", "notice", # put down log-level, because of too many debug-messages "-l", "notice", # put down log-level, because of too many debug-messages
Fail2banRegexTest.FILENAME_ZZZ_GEN, Fail2banRegexTest.FILTER_ZZZ_GEN+"[mode=test]" FILENAME_ZZZ_GEN, FILTER_ZZZ_GEN+"[mode=test]"
) ))
self.assertTrue(fail2banRegex.start(args))
def testDirectMultilineBuf(self): def testDirectMultilineBuf(self):
# test it with some pre-lines also to cover correct buffer scrolling (all multi-lines printed): # test it with some pre-lines also to cover correct buffer scrolling (all multi-lines printed):
for preLines in (0, 20): for preLines in (0, 20):
self.pruneLog("[test-phase %s]" % preLines) self.pruneLog("[test-phase %s]" % preLines)
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"--usedns", "no", "-d", "^Epoch", "--print-all-matched", "--maxlines", "5", "--usedns", "no", "-d", "^Epoch", "--print-all-matched", "--maxlines", "5",
("1490349000 TEST-NL\n"*preLines) + ("1490349000 TEST-NL\n"*preLines) +
"1490349000 FAIL\n1490349000 TEST1\n1490349001 TEST2\n1490349001 HOST 192.0.2.34", "1490349000 FAIL\n1490349000 TEST1\n1490349001 TEST2\n1490349001 HOST 192.0.2.34",
r"^\s*FAIL\s*$<SKIPLINES>^\s*HOST <HOST>\s*$" r"^\s*FAIL\s*$<SKIPLINES>^\s*HOST <HOST>\s*$"
) ))
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: %s lines, 0 ignored, 2 matched, %s missed' % (preLines+4, preLines+2)) self.assertLogged('Lines: %s lines, 0 ignored, 2 matched, %s missed' % (preLines+4, preLines+2))
# both matched lines were printed: # both matched lines were printed:
self.assertLogged("| 1490349000 FAIL", "| 1490349001 HOST 192.0.2.34", all=True) self.assertLogged("| 1490349000 FAIL", "| 1490349001 HOST 192.0.2.34", all=True)
def testDirectMultilineBufDebuggex(self): def testDirectMultilineBufDebuggex(self):
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"--usedns", "no", "-d", "^Epoch", "--debuggex", "--print-all-matched", "--maxlines", "5", "--usedns", "no", "-d", "^Epoch", "--debuggex", "--print-all-matched", "--maxlines", "5",
"1490349000 FAIL\n1490349000 TEST1\n1490349001 TEST2\n1490349001 HOST 192.0.2.34", "1490349000 FAIL\n1490349000 TEST1\n1490349001 TEST2\n1490349001 HOST 192.0.2.34",
r"^\s*FAIL\s*$<SKIPLINES>^\s*HOST <HOST>\s*$" r"^\s*FAIL\s*$<SKIPLINES>^\s*HOST <HOST>\s*$"
) ))
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: 4 lines, 0 ignored, 2 matched, 2 missed') self.assertLogged('Lines: 4 lines, 0 ignored, 2 matched, 2 missed')
# the sequence in args-dict is currently undefined (so can be 1st argument) # the sequence in args-dict is currently undefined (so can be 1st argument)
self.assertLogged("&flags=m", "?flags=m") self.assertLogged("&flags=m", "?flags=m")
def testSinglelineWithNLinContent(self): def testSinglelineWithNLinContent(self):
# #
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"--usedns", "no", "-d", "^Epoch", "--print-all-matched", "--usedns", "no", "-d", "^Epoch", "--print-all-matched",
"1490349000 FAIL: failure\nhost: 192.0.2.35", "1490349000 FAIL: failure\nhost: 192.0.2.35",
r"^\s*FAIL:\s*.*\nhost:\s+<HOST>$" r"^\s*FAIL:\s*.*\nhost:\s+<HOST>$"
) ))
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed') self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed')
def testRegexEpochPatterns(self): def testRegexEpochPatterns(self):
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"-r", "-d", r"^\[{LEPOCH}\]\s+", "--maxlines", "5", "-r", "-d", r"^\[{LEPOCH}\]\s+", "--maxlines", "5",
"[1516469849] 192.0.2.1 FAIL: failure\n" "[1516469849] 192.0.2.1 FAIL: failure\n"
"[1516469849551] 192.0.2.2 FAIL: failure\n" "[1516469849551] 192.0.2.2 FAIL: failure\n"
"[1516469849551000] 192.0.2.3 FAIL: failure\n" "[1516469849551000] 192.0.2.3 FAIL: failure\n"
"[1516469849551.000] 192.0.2.4 FAIL: failure", "[1516469849551.000] 192.0.2.4 FAIL: failure",
r"^<HOST> FAIL\b" r"^<HOST> FAIL\b"
) ))
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: 4 lines, 0 ignored, 4 matched, 0 missed') self.assertLogged('Lines: 4 lines, 0 ignored, 4 matched, 0 missed')
def testRegexSubnet(self): def testRegexSubnet(self):
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"-vv", "-d", r"^\[{LEPOCH}\]\s+", "--maxlines", "5", "-vv", "-d", r"^\[{LEPOCH}\]\s+", "--maxlines", "5",
"[1516469849] 192.0.2.1 FAIL: failure\n" "[1516469849] 192.0.2.1 FAIL: failure\n"
"[1516469849] 192.0.2.1/24 FAIL: failure\n" "[1516469849] 192.0.2.1/24 FAIL: failure\n"
"[1516469849] 2001:DB8:FF:FF::1 FAIL: failure\n" "[1516469849] 2001:DB8:FF:FF::1 FAIL: failure\n"
"[1516469849] 2001:DB8:FF:FF::1/60 FAIL: failure\n", "[1516469849] 2001:DB8:FF:FF::1/60 FAIL: failure\n",
r"^<SUBNET> FAIL\b" r"^<SUBNET> FAIL\b"
) ))
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: 4 lines, 0 ignored, 4 matched, 0 missed') self.assertLogged('Lines: 4 lines, 0 ignored, 4 matched, 0 missed')
self.assertLogged('192.0.2.0/24', '2001:db8:ff:f0::/60', all=True) self.assertLogged('192.0.2.0/24', '2001:db8:ff:f0::/60', all=True)
def testFrmtOutput(self):
# id/ip only:
self.assertTrue(_test_exec('-o', 'id', STR_00, RE_00_ID))
self.assertLogged('kevin')
self.pruneLog()
# row with id :
self.assertTrue(_test_exec('-o', 'row', STR_00, RE_00_ID))
self.assertLogged("['kevin'", "'ip4': '192.0.2.0'", "'fid': 'kevin'", all=True)
self.pruneLog()
# row with ip :
self.assertTrue(_test_exec('-o', 'row', STR_00, RE_00_USER))
self.assertLogged("['192.0.2.0'", "'ip4': '192.0.2.0'", "'user': 'kevin'", all=True)
self.pruneLog()
# log msg :
self.assertTrue(_test_exec('-o', 'msg', STR_00, RE_00_USER))
self.assertLogged(STR_00)
self.pruneLog()
# item of match (user):
self.assertTrue(_test_exec('-o', 'user', STR_00, RE_00_USER))
self.assertLogged('kevin')
self.pruneLog()
def testWrongFilterFile(self): def testWrongFilterFile(self):
# use test log as filter file to cover eror cases... # use test log as filter file to cover eror cases...
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertFalse(_test_exec(
Fail2banRegexTest.FILENAME_ZZZ_GEN, Fail2banRegexTest.FILENAME_ZZZ_GEN FILENAME_ZZZ_GEN, FILENAME_ZZZ_GEN
) ))
self.assertFalse(fail2banRegex.start(args))
def _reset(self): def _reset(self):
# reset global warn-counter: # reset global warn-counter:
@ -352,12 +355,11 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testWronChar(self): def testWronChar(self):
unittest.F2B.SkipIfCfgMissing(stock=True) unittest.F2B.SkipIfCfgMissing(stock=True)
self._reset() self._reset()
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"-l", "notice", # put down log-level, because of too many debug-messages "-l", "notice", # put down log-level, because of too many debug-messages
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?", "--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
Fail2banRegexTest.FILENAME_WRONGCHAR, Fail2banRegexTest.FILTER_SSHD FILENAME_WRONGCHAR, FILTER_SSHD
) ))
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Lines: 4 lines, 0 ignored, 2 matched, 2 missed') self.assertLogged('Lines: 4 lines, 0 ignored, 2 matched, 2 missed')
self.assertLogged('Error decoding line') self.assertLogged('Error decoding line')
@ -369,14 +371,13 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testWronCharDebuggex(self): def testWronCharDebuggex(self):
unittest.F2B.SkipIfCfgMissing(stock=True) unittest.F2B.SkipIfCfgMissing(stock=True)
self._reset() self._reset()
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"-l", "notice", # put down log-level, because of too many debug-messages "-l", "notice", # put down log-level, because of too many debug-messages
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?", "--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--debuggex", "--print-all-matched", "--debuggex", "--print-all-matched",
Fail2banRegexTest.FILENAME_WRONGCHAR, Fail2banRegexTest.FILTER_SSHD, FILENAME_WRONGCHAR, FILTER_SSHD,
r"llinco[^\\]" r"llinco[^\\]"
) ))
self.assertTrue(fail2banRegex.start(args))
self.assertLogged('Error decoding line') self.assertLogged('Error decoding line')
self.assertLogged('Lines: 4 lines, 1 ignored, 2 matched, 1 missed') self.assertLogged('Lines: 4 lines, 1 ignored, 2 matched, 1 missed')
@ -393,16 +394,14 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testExecCmdLine_Direct(self): def testExecCmdLine_Direct(self):
self.assertEqual(_test_exec_command_line( self.assertEqual(_test_exec_command_line(
'-l', 'info', '-l', 'info',
"Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0", STR_00, r"Authentication failure for .*? from <HOST>$"
r"Authentication failure for .*? from <HOST>$"
), 0) ), 0)
self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed') self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed')
def testExecCmdLine_MissFailID(self): def testExecCmdLine_MissFailID(self):
self.assertNotEqual(_test_exec_command_line( self.assertNotEqual(_test_exec_command_line(
'-l', 'info', '-l', 'info',
"Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0", STR_00, r"Authentication failure"
r"Authentication failure"
), 0) ), 0)
self.assertLogged('No failure-id group in ') self.assertLogged('No failure-id group in ')
@ -422,23 +421,21 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testLogtypeSystemdJournal(self): # pragma: no cover def testLogtypeSystemdJournal(self): # pragma: no cover
if not fail2banregex.FilterSystemd: if not fail2banregex.FilterSystemd:
raise unittest.SkipTest('Skip test because no systemd backand available') raise unittest.SkipTest('Skip test because no systemd backand available')
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"systemd-journal", Fail2banRegexTest.FILTER_ZZZ_GEN "systemd-journal", FILTER_ZZZ_GEN
+'[journalmatch="SYSLOG_IDENTIFIER=\x01\x02dummy\x02\x01",' +'[journalmatch="SYSLOG_IDENTIFIER=\x01\x02dummy\x02\x01",'
+' failregex="^\x00\x01\x02dummy regex, never match <F-ID>xxx</F-ID>"]' +' failregex="^\x00\x01\x02dummy regex, never match <F-ID>xxx</F-ID>"]'
) ))
self.assertTrue(fail2banRegex.start(args))
self.assertLogged("'logtype': 'journal'") self.assertLogged("'logtype': 'journal'")
self.assertNotLogged("'logtype': 'file'") self.assertNotLogged("'logtype': 'file'")
self.assertLogged('Lines: 0 lines, 0 ignored, 0 matched, 0 missed') self.assertLogged('Lines: 0 lines, 0 ignored, 0 matched, 0 missed')
self.pruneLog() self.pruneLog()
# logtype specified explicitly (should win in filter): # logtype specified explicitly (should win in filter):
(opts, args, fail2banRegex) = _Fail2banRegex( self.assertTrue(_test_exec(
"systemd-journal", Fail2banRegexTest.FILTER_ZZZ_GEN "systemd-journal", FILTER_ZZZ_GEN
+'[logtype=file,' +'[logtype=file,'
+' journalmatch="SYSLOG_IDENTIFIER=\x01\x02dummy\x02\x01",' +' journalmatch="SYSLOG_IDENTIFIER=\x01\x02dummy\x02\x01",'
+' failregex="^\x00\x01\x02dummy regex, never match <F-ID>xxx</F-ID>"]' +' failregex="^\x00\x01\x02dummy regex, never match <F-ID>xxx</F-ID>"]'
) ))
self.assertTrue(fail2banRegex.start(args))
self.assertLogged("'logtype': 'file'") self.assertLogged("'logtype': 'file'")
self.assertNotLogged("'logtype': 'journal'") self.assertNotLogged("'logtype': 'journal'")