diff --git a/ChangeLog b/ChangeLog
index 301d5497..bf64534b 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -85,7 +85,8 @@ ver. 0.10.5-dev-1 (20??/??/??) - development edition
* `filter.d/sshd.conf`:
- matches `Bad protocol version identification` in `ddos` and `aggressive` modes (gh-2404).
- captures `Disconnecting ...: Change of username or service not allowed` (gh-2239, gh-2279)
- - captures `Disconnected from ... [preauth]` (`extra`/`aggressive` mode and preauth phase only, gh-2239, gh-2279)
+ - captures `Disconnected from ... [preauth]`, preauth phase only, different handling by `extra`
+ (with supplied user only) and `ddos`/`aggressive` mode (gh-2115, gh-2239, gh-2279)
* `filter.d/mysqld-auth.conf`:
- MYSQL 8.0.13 compatibility (log-error-verbosity = 3), log-format contains few additional words
enclosed in brackets after "[Note]" (gh-2314)
@@ -180,6 +181,10 @@ filter = flt[logtype=short]
* partially implements gh-980 (more breakdown safe handling);
* closes gh-1680 (better as large-scale banning implementation with on-demand reban by failure,
at least unless a bulk-ban gets implemented);
+* fail2ban-regex - several enhancements and fixes:
+ - improved usage output (don't put a long help if an error occurs);
+ - new option `--no-check-all` to avoid check of all regex's (first matched only);
+ - new option `-o`, `--out` to set token only provided in output (disables check-all and outputs only expected data).
ver. 0.10.4 (2018/10/04) - ten-four-on-due-date-ten-four
diff --git a/config/filter.d/sshd.conf b/config/filter.d/sshd.conf
index b10f5acb..d764a076 100644
--- a/config/filter.d/sshd.conf
+++ b/config/filter.d/sshd.conf
@@ -25,7 +25,7 @@ __pref = (?:(?:error|fatal): (?:PAM: )?)?
__suff = (?: (?:port \d+|on \S+|\[preauth\])){0,3}\s*
__on_port_opt = (?: (?:port \d+|on \S+)){0,2}
# close by authenticating user:
-__authng_user = (?: authenticating user \S+|.+?)?
+__authng_user = (?: (?:invalid|authenticating) user \S+|.+?)?
# for all possible (also future) forms of "no matching (cipher|mac|MAC|compression method|key exchange method|host key type) found",
# see ssherr.c for all possible SSH_ERR_..._ALG_MATCH errors.
@@ -57,31 +57,32 @@ cmnfailre = ^[aA]uthentication (?:failure|error|failed) for .*
^Disconnecting(?: from)?(?: (?:invalid|authenticating)) user \S+ %(__on_port_opt)s:\s*Change of username or service not allowed:\s*.*\[preauth\]\s*$
^Disconnecting: Too many authentication failures(?: for .+?)?%(__suff)s$
^Received disconnect from %(__on_port_opt)s:\s*11:
- ^Connection closed by%(__authng_user)s -suff-onclosed>
+ -other>
^Accepted \w+ for \S+ from (?:\s|$)
mdre-normal =
# used to differentiate "connection closed" with and without `[preauth]` (fail/nofail cases in ddos mode)
-mdrp-normal-suff-onclosed = (?:%(__suff)s|\s*)$
+mdre-normal-other = ^(Connection closed|Disconnected) (?:by|from)%(__authng_user)s (?:%(__suff)s|\s*)$
mdre-ddos = ^Did not receive identification string from
^Bad protocol version identification '.*' from
^Connection reset by
- ^Connection closed by%(__authng_user)s %(__on_port_opt)s\s+\[preauth\]\s*$
^SSH: Server;Ltype: (?:Authname|Version|Kex);Remote: -\d+;[A-Z]\w+:
^Read from socket failed: Connection reset by peer
-mdrp-ddos-suff-onclosed = %(__on_port_opt)s\s*$
+# same as mdre-normal-other, but as failure (without ) and [preauth] only:
+mdre-ddos-other = ^(Connection closed|Disconnected) (?:by|from)%(__authng_user)s %(__on_port_opt)s\s+\[preauth\]\s*$
mdre-extra = ^Received disconnect from %(__on_port_opt)s:\s*14: No supported authentication methods available
^Unable to negotiate with %(__on_port_opt)s: no matching <__alg_match> found.
^Unable to negotiate a <__alg_match>
^no matching <__alg_match> found:
- ^Disconnected(?: from)?(?: (?:invalid|authenticating)) user \S+ %(__on_port_opt)s \[preauth\]\s*$
-mdrp-extra-suff-onclosed = %(mdrp-normal-suff-onclosed)s
+# part of mdre-ddos-other, but user name is supplied (invalid/authenticating) on [preauth] phase only:
+mdre-extra-other = ^Disconnected(?: from)?(?: (?:invalid|authenticating)) user \S+|.*? %(__on_port_opt)s \[preauth\]\s*$
mdre-aggressive = %(mdre-ddos)s
%(mdre-extra)s
-mdrp-aggressive-suff-onclosed = %(mdrp-ddos-suff-onclosed)s
+# mdre-extra-other is fully included within mdre-ddos-other:
+mdre-aggressive-other = %(mdre-ddos-other)s
cfooterre = ^Connection from
diff --git a/fail2ban/client/fail2banregex.py b/fail2ban/client/fail2banregex.py
index 9279174c..de2bf05a 100644
--- a/fail2ban/client/fail2banregex.py
+++ b/fail2ban/client/fail2banregex.py
@@ -25,7 +25,13 @@ This tools can test regular expressions for "fail2ban".
"""
__author__ = "Fail2Ban Developers"
-__copyright__ = "Copyright (c) 2004-2008 Cyril Jaquier, 2012-2014 Yaroslav Halchenko"
+__copyright__ = """Copyright (c) 2004-2008 Cyril Jaquier, 2008- Fail2Ban Contributors
+Copyright of modifications held by their respective authors.
+Licensed under the GNU General Public License v2 (GPL).
+
+Written by Cyril Jaquier .
+Many contributions by Yaroslav O. Halchenko, Steven Hiscocks, Sergey G. Brester (sebres)."""
+
__license__ = "GPL"
import getopt
@@ -97,11 +103,12 @@ def dumpNormVersion(*args):
output(normVersion())
sys.exit(0)
-def get_opt_parser():
- # use module docstring for help output
- p = OptionParser(
- usage="%s [OPTIONS] [IGNOREREGEX]\n" % sys.argv[0] + __doc__
- + """
+usage = lambda: "%s [OPTIONS] [IGNOREREGEX]" % sys.argv[0]
+
+class _f2bOptParser(OptionParser):
+ def format_help(self, *args, **kwargs):
+ """ Overwritten format helper with full ussage."""
+ return usage() + __doc__ + """
LOG:
string a string representing a log line
filename path to a log file (/var/log/auth.log)
@@ -114,16 +121,15 @@ REGEX:
IGNOREREGEX:
string a string representing an 'ignoreregex'
filename path to a filter file (filter.d/sshd.conf)
+ """ + OptionParser.format_help(self, *args, **kwargs) + """\n
+Report bugs to https://github.com/fail2ban/fail2ban/issues\n
+""" + __copyright__ + "\n"
-Copyright (c) 2004-2008 Cyril Jaquier, 2008- Fail2Ban Contributors
-Copyright of modifications held by their respective authors.
-Licensed under the GNU General Public License v2 (GPL).
-Written by Cyril Jaquier .
-Many contributions by Yaroslav O. Halchenko and Steven Hiscocks.
-
-Report bugs to https://github.com/fail2ban/fail2ban/issues
-""",
+def get_opt_parser():
+ # use module docstring for help output
+ p = _f2bOptParser(
+ usage=usage(),
version="%prog " + version)
p.add_options([
@@ -160,6 +166,10 @@ Report bugs to https://github.com/fail2ban/fail2ban/issues
help="Verbose date patterns/regex in output"),
Option("-D", "--debuggex", action='store_true',
help="Produce debuggex.com urls for debugging there"),
+ Option("--no-check-all", action="store_false", dest="checkAllRegex", default=True,
+ help="Disable check for all regex's"),
+ Option("-o", "--out", action="store", dest="out", default=None,
+ help="Set token to print failure information only (row, id, ip, msg, host, ip4, ip6, dns, matches, ...)"),
Option("--print-no-missed", action='store_true',
help="Do not print any missed lines"),
Option("--print-no-ignored", action='store_true',
@@ -234,6 +244,7 @@ class Fail2banRegex(object):
def __init__(self, opts):
# set local protected members from given options:
self.__dict__.update(dict(('_'+o,v) for o,v in opts.__dict__.iteritems()))
+ self._opts = opts
self._maxlines_set = False # so we allow to override maxlines in cmdline
self._datepattern_set = False
self._journalmatch = None
@@ -259,10 +270,12 @@ class Fail2banRegex(object):
self._filter.setUseDns(opts.usedns)
self._filter.returnRawHost = opts.raw
self._filter.checkFindTime = False
- self._filter.checkAllRegex = True
- self._opts = opts
+ self._filter.checkAllRegex = opts.checkAllRegex and not opts.out
self._backend = 'auto'
+ def output(self, line):
+ if not self._opts.out: output(line)
+
def decode_line(self, line):
return FileContainer.decode_line('', self._encoding, line)
@@ -274,14 +287,14 @@ class Fail2banRegex(object):
self._filter.setDatePattern(pattern)
self._datepattern_set = True
if pattern is not None:
- output( "Use datepattern : %s" % (
+ self.output( "Use datepattern : %s" % (
self._filter.getDatePattern()[1], ) )
def setMaxLines(self, v):
if not self._maxlines_set:
self._filter.setMaxLines(int(v))
self._maxlines_set = True
- output( "Use maxlines : %d" % self._filter.getMaxLines() )
+ self.output( "Use maxlines : %d" % self._filter.getMaxLines() )
def setJournalMatch(self, v):
self._journalmatch = v
@@ -297,7 +310,7 @@ class Fail2banRegex(object):
realopts[k] = combopts[k] if k in combopts else reader.get('Definition', k)
except NoOptionError: # pragma: no cover
pass
- output("Real filter options : %r" % realopts)
+ self.output("Real filter options : %r" % realopts)
def readRegex(self, value, regextype):
assert(regextype in ('fail', 'ignore'))
@@ -334,15 +347,15 @@ class Fail2banRegex(object):
if os.path.basename(basedir) == 'filter.d':
basedir = os.path.dirname(basedir)
fltName = os.path.splitext(os.path.basename(fltName))[0]
- output( "Use %11s filter file : %s, basedir: %s" % (regex, fltName, basedir) )
+ self.output( "Use %11s filter file : %s, basedir: %s" % (regex, fltName, basedir) )
else:
## foreign file - readexplicit this file and includes if possible:
- output( "Use %11s file : %s" % (regex, fltName) )
+ self.output( "Use %11s file : %s" % (regex, fltName) )
basedir = None
if not os.path.isabs(fltName): # avoid join with "filter.d" inside FilterReader
fltName = os.path.abspath(fltName)
if fltOpt:
- output( "Use filter options : %r" % fltOpt )
+ self.output( "Use filter options : %r" % fltOpt )
reader = FilterReader(fltName, 'fail2ban-regex-jail', fltOpt, share_config=self.share_config, basedir=basedir)
ret = None
try:
@@ -410,7 +423,7 @@ class Fail2banRegex(object):
return False
else:
- output( "Use %11s line : %s" % (regex, shortstr(value)) )
+ self.output( "Use %11s line : %s" % (regex, shortstr(value)) )
regex_values = {regextype: [RegexStat(value)]}
for regextype, regex_values in regex_values.iteritems():
@@ -506,6 +519,20 @@ class Fail2banRegex(object):
if len(ret) > 0:
assert(not is_ignored)
+ if self._opts.out:
+ if self._opts.out in ('id', 'ip'):
+ for ret in ret:
+ output(ret[1])
+ elif self._opts.out == 'msg':
+ for ret in ret:
+ output('\n'.join(map(lambda v:''.join(v for v in v), ret[3].get('matches'))))
+ elif self._opts.out == 'row':
+ for ret in ret:
+ output('[%r,\t%r,\t%r],' % (ret[1],ret[2],dict((k,v) for k, v in ret[3].iteritems() if k != 'matches')))
+ else:
+ for ret in ret:
+ output(ret[3].get(self._opts.out))
+ continue
self._line_stats.matched += 1
if self._print_all_matched:
self._line_stats.matched_lines.append(line)
@@ -554,6 +581,7 @@ class Fail2banRegex(object):
"to print all %d lines" % (header, ltype, lines) )
def printStats(self):
+ if self._opts.out: return True
output( "" )
output( "Results" )
output( "=======" )
@@ -636,8 +664,8 @@ class Fail2banRegex(object):
if os.path.isfile(cmd_log):
try:
hdlr = open(cmd_log, 'rb')
- output( "Use log file : %s" % cmd_log )
- output( "Use encoding : %s" % self._encoding )
+ self.output( "Use log file : %s" % cmd_log )
+ self.output( "Use encoding : %s" % self._encoding )
test_lines = self.file_lines_gen(hdlr)
except IOError as e: # pragma: no cover
output( e )
@@ -646,8 +674,8 @@ class Fail2banRegex(object):
if not FilterSystemd:
output( "Error: systemd library not found. Exiting..." )
return False
- output( "Use systemd journal" )
- output( "Use encoding : %s" % self._encoding )
+ self.output( "Use systemd journal" )
+ self.output( "Use encoding : %s" % self._encoding )
backend, beArgs = extractOptions(cmd_log)
flt = FilterSystemd(None, **beArgs)
flt.setLogEncoding(self._encoding)
@@ -656,23 +684,23 @@ class Fail2banRegex(object):
self.setDatePattern(None)
if journalmatch:
flt.addJournalMatch(journalmatch)
- output( "Use journal match : %s" % " ".join(journalmatch) )
+ self.output( "Use journal match : %s" % " ".join(journalmatch) )
test_lines = journal_lines_gen(flt, myjournal)
else:
# if single line parsing (without buffering)
if self._filter.getMaxLines() <= 1:
- output( "Use single line : %s" % shortstr(cmd_log.replace("\n", r"\n")) )
+ self.output( "Use single line : %s" % shortstr(cmd_log.replace("\n", r"\n")) )
test_lines = [ cmd_log ]
else: # multi line parsing (with buffering)
test_lines = cmd_log.split("\n")
- output( "Use multi line : %s line(s)" % len(test_lines) )
+ self.output( "Use multi line : %s line(s)" % len(test_lines) )
for i, l in enumerate(test_lines):
if i >= 5:
- output( "| ..." ); break
- output( "| %2.2s: %s" % (i+1, shortstr(l)) )
- output( "`-" )
+ self.output( "| ..." ); break
+ self.output( "| %2.2s: %s" % (i+1, shortstr(l)) )
+ self.output( "`-" )
- output( "" )
+ self.output( "" )
self.process(test_lines)
@@ -695,14 +723,15 @@ def exec_command_line(*args):
if not len(args) in (2, 3):
errors.append("ERROR: provide both and .")
if errors:
- sys.stderr.write("\n".join(errors) + "\n\n")
parser.print_help()
+ sys.stderr.write("\n" + "\n".join(errors) + "\n")
sys.exit(255)
- output( "" )
- output( "Running tests" )
- output( "=============" )
- output( "" )
+ if not opts.out:
+ output( "" )
+ output( "Running tests" )
+ output( "=============" )
+ output( "" )
# Log level (default critical):
opts.log_level = str2LogLevel(opts.log_level)
diff --git a/fail2ban/server/filter.py b/fail2ban/server/filter.py
index d654fe9e..998fe298 100644
--- a/fail2ban/server/filter.py
+++ b/fail2ban/server/filter.py
@@ -887,6 +887,7 @@ class Filter(JailThread):
fid = failRegex.getFailID()
host = fid
cidr = IPAddr.CIDR_RAW
+ raw = True
# if mlfid case (not failure):
if host is None:
if ll <= 7: logSys.log(7, "No failure-id by mlfid %r in regex %s: %s",
diff --git a/fail2ban/tests/fail2banregextestcase.py b/fail2ban/tests/fail2banregextestcase.py
index 05db2a24..c09c4171 100644
--- a/fail2ban/tests/fail2banregextestcase.py
+++ b/fail2ban/tests/fail2banregextestcase.py
@@ -52,6 +52,10 @@ def _Fail2banRegex(*args):
logSys.setLevel(str2LogLevel(opts.log_level))
return (opts, args, Fail2banRegex(opts))
+def _test_exec(*args):
+ (opts, args, fail2banRegex) = _Fail2banRegex(*args)
+ return fail2banRegex.start(args)
+
class ExitException(Exception):
def __init__(self, code):
self.code = code
@@ -76,23 +80,27 @@ def _test_exec_command_line(*args):
sys.stderr = _org['stderr']
return _exit_code
+STR_00 = "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0"
+
+RE_00 = r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) "
+RE_00_ID = r"Authentication failure for .*? from $"
+RE_00_USER = r"Authentication failure for .*? from $"
+
+FILENAME_01 = os.path.join(TEST_FILES_DIR, "testcase01.log")
+FILENAME_02 = os.path.join(TEST_FILES_DIR, "testcase02.log")
+FILENAME_WRONGCHAR = os.path.join(TEST_FILES_DIR, "testcase-wrong-char.log")
+
+FILENAME_SSHD = os.path.join(TEST_FILES_DIR, "logs", "sshd")
+FILTER_SSHD = os.path.join(CONFIG_DIR, 'filter.d', 'sshd.conf')
+FILENAME_ZZZ_SSHD = os.path.join(TEST_FILES_DIR, 'zzz-sshd-obsolete-multiline.log')
+FILTER_ZZZ_SSHD = os.path.join(TEST_CONFIG_DIR, 'filter.d', 'zzz-sshd-obsolete-multiline.conf')
+
+FILENAME_ZZZ_GEN = os.path.join(TEST_FILES_DIR, "logs", "zzz-generic-example")
+FILTER_ZZZ_GEN = os.path.join(TEST_CONFIG_DIR, 'filter.d', 'zzz-generic-example.conf')
+
class Fail2banRegexTest(LogCaptureTestCase):
- RE_00 = r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) "
-
- FILENAME_01 = os.path.join(TEST_FILES_DIR, "testcase01.log")
- FILENAME_02 = os.path.join(TEST_FILES_DIR, "testcase02.log")
- FILENAME_WRONGCHAR = os.path.join(TEST_FILES_DIR, "testcase-wrong-char.log")
-
- FILENAME_SSHD = os.path.join(TEST_FILES_DIR, "logs", "sshd")
- FILTER_SSHD = os.path.join(CONFIG_DIR, 'filter.d', 'sshd.conf')
- FILENAME_ZZZ_SSHD = os.path.join(TEST_FILES_DIR, 'zzz-sshd-obsolete-multiline.log')
- FILTER_ZZZ_SSHD = os.path.join(TEST_CONFIG_DIR, 'filter.d', 'zzz-sshd-obsolete-multiline.conf')
-
- FILENAME_ZZZ_GEN = os.path.join(TEST_FILES_DIR, "logs", "zzz-generic-example")
- FILTER_ZZZ_GEN = os.path.join(TEST_CONFIG_DIR, 'filter.d', 'zzz-generic-example.conf')
-
def setUp(self):
"""Call before every test case."""
LogCaptureTestCase.setUp(self)
@@ -104,57 +112,50 @@ class Fail2banRegexTest(LogCaptureTestCase):
tearDownMyTime()
def testWrongRE(self):
- (opts, args, fail2banRegex) = _Fail2banRegex(
+ self.assertFalse(_test_exec(
"test", r".** from $"
- )
- self.assertFalse(fail2banRegex.start(args))
+ ))
self.assertLogged("Unable to compile regular expression")
def testWrongIngnoreRE(self):
- (opts, args, fail2banRegex) = _Fail2banRegex(
+ self.assertFalse(_test_exec(
"--datepattern", "{^LN-BEG}EPOCH",
"test", r".*? from $", r".**"
- )
- self.assertFalse(fail2banRegex.start(args))
+ ))
self.assertLogged("Unable to compile regular expression")
def testDirectFound(self):
- (opts, args, fail2banRegex) = _Fail2banRegex(
+ self.assertTrue(_test_exec(
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched", "--print-no-missed",
- "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
+ STR_00,
r"Authentication failure for .*? from $"
- )
- self.assertTrue(fail2banRegex.start(args))
+ ))
self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed')
def testDirectNotFound(self):
- (opts, args, fail2banRegex) = _Fail2banRegex(
+ self.assertTrue(_test_exec(
"--print-all-missed",
- "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
+ STR_00,
r"XYZ from $"
- )
- self.assertTrue(fail2banRegex.start(args))
+ ))
self.assertLogged('Lines: 1 lines, 0 ignored, 0 matched, 1 missed')
def testDirectIgnored(self):
- (opts, args, fail2banRegex) = _Fail2banRegex(
+ self.assertTrue(_test_exec(
"--print-all-ignored",
- "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
+ STR_00,
r"Authentication failure for .*? from $",
r"kevin from 192.0.2.0$"
- )
- self.assertTrue(fail2banRegex.start(args))
+ ))
self.assertLogged('Lines: 1 lines, 1 ignored, 0 matched, 0 missed')
def testDirectRE_1(self):
- (opts, args, fail2banRegex) = _Fail2banRegex(
+ self.assertTrue(_test_exec(
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched",
- Fail2banRegexTest.FILENAME_01,
- Fail2banRegexTest.RE_00
- )
- self.assertTrue(fail2banRegex.start(args))
+ FILENAME_01, RE_00
+ ))
self.assertLogged('Lines: 19 lines, 0 ignored, 13 matched, 6 missed')
self.assertLogged('Error decoding line');
@@ -164,81 +165,78 @@ class Fail2banRegexTest(LogCaptureTestCase):
self.assertLogged('Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 87.142.124.10')
def testDirectRE_1raw(self):
- (opts, args, fail2banRegex) = _Fail2banRegex(
+ self.assertTrue(_test_exec(
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched", "--raw",
- Fail2banRegexTest.FILENAME_01,
- Fail2banRegexTest.RE_00
- )
- self.assertTrue(fail2banRegex.start(args))
+ FILENAME_01, RE_00
+ ))
self.assertLogged('Lines: 19 lines, 0 ignored, 16 matched, 3 missed')
def testDirectRE_1raw_noDns(self):
- (opts, args, fail2banRegex) = _Fail2banRegex(
+ self.assertTrue(_test_exec(
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched", "--raw", "--usedns=no",
- Fail2banRegexTest.FILENAME_01,
- Fail2banRegexTest.RE_00
- )
- self.assertTrue(fail2banRegex.start(args))
+ FILENAME_01, RE_00
+ ))
self.assertLogged('Lines: 19 lines, 0 ignored, 13 matched, 6 missed')
+ # usage of \S+ causes raw handling automatically:
+ self.pruneLog()
+ self.assertTrue(_test_exec(
+ "-d", "^Epoch",
+ "1490349000 test failed.dns.ch", "^\s*test \S+"
+ ))
+ self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed', all=True)
+ self.assertNotLogged('Unable to find a corresponding IP address')
def testDirectRE_2(self):
- (opts, args, fail2banRegex) = _Fail2banRegex(
+ self.assertTrue(_test_exec(
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched",
- Fail2banRegexTest.FILENAME_02,
- Fail2banRegexTest.RE_00
- )
- self.assertTrue(fail2banRegex.start(args))
+ FILENAME_02, RE_00
+ ))
self.assertLogged('Lines: 13 lines, 0 ignored, 5 matched, 8 missed')
def testVerbose(self):
- (opts, args, fail2banRegex) = _Fail2banRegex(
+ self.assertTrue(_test_exec(
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--timezone", "UTC+0200",
"--verbose", "--verbose-date", "--print-no-missed",
- Fail2banRegexTest.FILENAME_02,
- Fail2banRegexTest.RE_00
- )
- self.assertTrue(fail2banRegex.start(args))
+ FILENAME_02, RE_00
+ ))
self.assertLogged('Lines: 13 lines, 0 ignored, 5 matched, 8 missed')
self.assertLogged('141.3.81.106 Sun Aug 14 11:53:59 2005')
self.assertLogged('141.3.81.106 Sun Aug 14 11:54:59 2005')
def testVerboseFullSshd(self):
- (opts, args, fail2banRegex) = _Fail2banRegex(
- "-l", "notice", # put down log-level, because of too many debug-messages
+ self.assertTrue(_test_exec(
+ "-l", "notice", # put down log-level, because of too many debug-messages
"-v", "--verbose-date", "--print-all-matched", "--print-all-ignored",
"-c", CONFIG_DIR,
- Fail2banRegexTest.FILENAME_SSHD, "sshd"
- )
- self.assertTrue(fail2banRegex.start(args))
+ FILENAME_SSHD, "sshd"
+ ))
# test failure line and not-failure lines both presents:
self.assertLogged("[29116]: User root not allowed because account is locked",
"[29116]: Received disconnect from 1.2.3.4", all=True)
self.pruneLog()
# show real options:
- (opts, args, fail2banRegex) = _Fail2banRegex(
- "-l", "notice", # put down log-level, because of too many debug-messages
+ self.assertTrue(_test_exec(
+ "-l", "notice", # put down log-level, because of too many debug-messages
"-vv", "-c", CONFIG_DIR,
"Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.1",
"sshd[logtype=short]"
- )
- self.assertTrue(fail2banRegex.start(args))
+ ))
# tet logtype is specified and set in real options:
self.assertLogged("Real filter options :", "'logtype': 'short'", all=True)
self.assertNotLogged("'logtype': 'file'", "'logtype': 'journal'", all=True)
def testFastSshd(self):
- (opts, args, fail2banRegex) = _Fail2banRegex(
- "-l", "notice", # put down log-level, because of too many debug-messages
+ self.assertTrue(_test_exec(
+ "-l", "notice", # put down log-level, because of too many debug-messages
"--print-all-matched",
"-c", CONFIG_DIR,
- Fail2banRegexTest.FILENAME_ZZZ_SSHD, "sshd.conf[mode=normal]"
- )
- self.assertTrue(fail2banRegex.start(args))
+ FILENAME_ZZZ_SSHD, "sshd.conf[mode=normal]"
+ ))
# test failure line and all not-failure lines presents:
self.assertLogged(
"[29116]: Connection from 192.0.2.4",
@@ -247,93 +245,107 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testMultilineSshd(self):
# by the way test of missing lines by multiline in `for bufLine in orgLineBuffer[int(fullBuffer):]`
- (opts, args, fail2banRegex) = _Fail2banRegex(
- "-l", "notice", # put down log-level, because of too many debug-messages
+ self.assertTrue(_test_exec(
+ "-l", "notice", # put down log-level, because of too many debug-messages
"--print-all-matched", "--print-all-missed",
- "-c", os.path.dirname(Fail2banRegexTest.FILTER_ZZZ_SSHD),
- Fail2banRegexTest.FILENAME_ZZZ_SSHD, os.path.basename(Fail2banRegexTest.FILTER_ZZZ_SSHD)
- )
- self.assertTrue(fail2banRegex.start(args))
+ "-c", os.path.dirname(FILTER_ZZZ_SSHD),
+ FILENAME_ZZZ_SSHD, os.path.basename(FILTER_ZZZ_SSHD)
+ ))
# test "failure" line presents (2nd part only, because multiline fewer precise):
self.assertLogged(
"[29116]: Received disconnect from 192.0.2.4", all=True)
def testFullGeneric(self):
# by the way test of ignoreregex (specified in filter file)...
- (opts, args, fail2banRegex) = _Fail2banRegex(
- "-l", "notice", # put down log-level, because of too many debug-messages
- Fail2banRegexTest.FILENAME_ZZZ_GEN, Fail2banRegexTest.FILTER_ZZZ_GEN+"[mode=test]"
- )
- self.assertTrue(fail2banRegex.start(args))
+ self.assertTrue(_test_exec(
+ "-l", "notice", # put down log-level, because of too many debug-messages
+ FILENAME_ZZZ_GEN, FILTER_ZZZ_GEN+"[mode=test]"
+ ))
def testDirectMultilineBuf(self):
# test it with some pre-lines also to cover correct buffer scrolling (all multi-lines printed):
for preLines in (0, 20):
self.pruneLog("[test-phase %s]" % preLines)
- (opts, args, fail2banRegex) = _Fail2banRegex(
+ self.assertTrue(_test_exec(
"--usedns", "no", "-d", "^Epoch", "--print-all-matched", "--maxlines", "5",
("1490349000 TEST-NL\n"*preLines) +
"1490349000 FAIL\n1490349000 TEST1\n1490349001 TEST2\n1490349001 HOST 192.0.2.34",
r"^\s*FAIL\s*$^\s*HOST \s*$"
- )
- self.assertTrue(fail2banRegex.start(args))
+ ))
self.assertLogged('Lines: %s lines, 0 ignored, 2 matched, %s missed' % (preLines+4, preLines+2))
# both matched lines were printed:
self.assertLogged("| 1490349000 FAIL", "| 1490349001 HOST 192.0.2.34", all=True)
def testDirectMultilineBufDebuggex(self):
- (opts, args, fail2banRegex) = _Fail2banRegex(
+ self.assertTrue(_test_exec(
"--usedns", "no", "-d", "^Epoch", "--debuggex", "--print-all-matched", "--maxlines", "5",
"1490349000 FAIL\n1490349000 TEST1\n1490349001 TEST2\n1490349001 HOST 192.0.2.34",
r"^\s*FAIL\s*$^\s*HOST \s*$"
- )
- self.assertTrue(fail2banRegex.start(args))
+ ))
self.assertLogged('Lines: 4 lines, 0 ignored, 2 matched, 2 missed')
# the sequence in args-dict is currently undefined (so can be 1st argument)
self.assertLogged("&flags=m", "?flags=m")
def testSinglelineWithNLinContent(self):
#
- (opts, args, fail2banRegex) = _Fail2banRegex(
+ self.assertTrue(_test_exec(
"--usedns", "no", "-d", "^Epoch", "--print-all-matched",
"1490349000 FAIL: failure\nhost: 192.0.2.35",
r"^\s*FAIL:\s*.*\nhost:\s+$"
- )
- self.assertTrue(fail2banRegex.start(args))
+ ))
self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed')
def testRegexEpochPatterns(self):
- (opts, args, fail2banRegex) = _Fail2banRegex(
+ self.assertTrue(_test_exec(
"-r", "-d", r"^\[{LEPOCH}\]\s+", "--maxlines", "5",
"[1516469849] 192.0.2.1 FAIL: failure\n"
"[1516469849551] 192.0.2.2 FAIL: failure\n"
"[1516469849551000] 192.0.2.3 FAIL: failure\n"
"[1516469849551.000] 192.0.2.4 FAIL: failure",
r"^ FAIL\b"
- )
- self.assertTrue(fail2banRegex.start(args))
+ ))
self.assertLogged('Lines: 4 lines, 0 ignored, 4 matched, 0 missed')
def testRegexSubnet(self):
- (opts, args, fail2banRegex) = _Fail2banRegex(
+ self.assertTrue(_test_exec(
"-vv", "-d", r"^\[{LEPOCH}\]\s+", "--maxlines", "5",
"[1516469849] 192.0.2.1 FAIL: failure\n"
"[1516469849] 192.0.2.1/24 FAIL: failure\n"
"[1516469849] 2001:DB8:FF:FF::1 FAIL: failure\n"
"[1516469849] 2001:DB8:FF:FF::1/60 FAIL: failure\n",
r"^ FAIL\b"
- )
- self.assertTrue(fail2banRegex.start(args))
+ ))
self.assertLogged('Lines: 4 lines, 0 ignored, 4 matched, 0 missed')
self.assertLogged('192.0.2.0/24', '2001:db8:ff:f0::/60', all=True)
+ def testFrmtOutput(self):
+ # id/ip only:
+ self.assertTrue(_test_exec('-o', 'id', STR_00, RE_00_ID))
+ self.assertLogged('kevin')
+ self.pruneLog()
+ # row with id :
+ self.assertTrue(_test_exec('-o', 'row', STR_00, RE_00_ID))
+ self.assertLogged("['kevin'", "'ip4': '192.0.2.0'", "'fid': 'kevin'", all=True)
+ self.pruneLog()
+ # row with ip :
+ self.assertTrue(_test_exec('-o', 'row', STR_00, RE_00_USER))
+ self.assertLogged("['192.0.2.0'", "'ip4': '192.0.2.0'", "'user': 'kevin'", all=True)
+ self.pruneLog()
+ # log msg :
+ self.assertTrue(_test_exec('-o', 'msg', STR_00, RE_00_USER))
+ self.assertLogged(STR_00)
+ self.pruneLog()
+ # item of match (user):
+ self.assertTrue(_test_exec('-o', 'user', STR_00, RE_00_USER))
+ self.assertLogged('kevin')
+ self.pruneLog()
+
def testWrongFilterFile(self):
# use test log as filter file to cover eror cases...
- (opts, args, fail2banRegex) = _Fail2banRegex(
- Fail2banRegexTest.FILENAME_ZZZ_GEN, Fail2banRegexTest.FILENAME_ZZZ_GEN
- )
- self.assertFalse(fail2banRegex.start(args))
+ self.assertFalse(_test_exec(
+ FILENAME_ZZZ_GEN, FILENAME_ZZZ_GEN
+ ))
def _reset(self):
# reset global warn-counter:
@@ -343,12 +355,11 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testWronChar(self):
unittest.F2B.SkipIfCfgMissing(stock=True)
self._reset()
- (opts, args, fail2banRegex) = _Fail2banRegex(
- "-l", "notice", # put down log-level, because of too many debug-messages
+ self.assertTrue(_test_exec(
+ "-l", "notice", # put down log-level, because of too many debug-messages
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
- Fail2banRegexTest.FILENAME_WRONGCHAR, Fail2banRegexTest.FILTER_SSHD
- )
- self.assertTrue(fail2banRegex.start(args))
+ FILENAME_WRONGCHAR, FILTER_SSHD
+ ))
self.assertLogged('Lines: 4 lines, 0 ignored, 2 matched, 2 missed')
self.assertLogged('Error decoding line')
@@ -360,14 +371,13 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testWronCharDebuggex(self):
unittest.F2B.SkipIfCfgMissing(stock=True)
self._reset()
- (opts, args, fail2banRegex) = _Fail2banRegex(
- "-l", "notice", # put down log-level, because of too many debug-messages
+ self.assertTrue(_test_exec(
+ "-l", "notice", # put down log-level, because of too many debug-messages
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--debuggex", "--print-all-matched",
- Fail2banRegexTest.FILENAME_WRONGCHAR, Fail2banRegexTest.FILTER_SSHD,
+ FILENAME_WRONGCHAR, FILTER_SSHD,
r"llinco[^\\]"
- )
- self.assertTrue(fail2banRegex.start(args))
+ ))
self.assertLogged('Error decoding line')
self.assertLogged('Lines: 4 lines, 1 ignored, 2 matched, 1 missed')
@@ -384,16 +394,14 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testExecCmdLine_Direct(self):
self.assertEqual(_test_exec_command_line(
'-l', 'info',
- "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
- r"Authentication failure for .*? from $"
+ STR_00, r"Authentication failure for .*? from $"
), 0)
self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed')
def testExecCmdLine_MissFailID(self):
self.assertNotEqual(_test_exec_command_line(
'-l', 'info',
- "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
- r"Authentication failure"
+ STR_00, r"Authentication failure"
), 0)
self.assertLogged('No failure-id group in ')
@@ -413,23 +421,21 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testLogtypeSystemdJournal(self): # pragma: no cover
if not fail2banregex.FilterSystemd:
raise unittest.SkipTest('Skip test because no systemd backand available')
- (opts, args, fail2banRegex) = _Fail2banRegex(
- "systemd-journal", Fail2banRegexTest.FILTER_ZZZ_GEN
+ self.assertTrue(_test_exec(
+ "systemd-journal", FILTER_ZZZ_GEN
+'[journalmatch="SYSLOG_IDENTIFIER=\x01\x02dummy\x02\x01",'
+' failregex="^\x00\x01\x02dummy regex, never match xxx"]'
- )
- self.assertTrue(fail2banRegex.start(args))
+ ))
self.assertLogged("'logtype': 'journal'")
self.assertNotLogged("'logtype': 'file'")
self.assertLogged('Lines: 0 lines, 0 ignored, 0 matched, 0 missed')
self.pruneLog()
# logtype specified explicitly (should win in filter):
- (opts, args, fail2banRegex) = _Fail2banRegex(
- "systemd-journal", Fail2banRegexTest.FILTER_ZZZ_GEN
+ self.assertTrue(_test_exec(
+ "systemd-journal", FILTER_ZZZ_GEN
+'[logtype=file,'
+' journalmatch="SYSLOG_IDENTIFIER=\x01\x02dummy\x02\x01",'
+' failregex="^\x00\x01\x02dummy regex, never match xxx"]'
- )
- self.assertTrue(fail2banRegex.start(args))
+ ))
self.assertLogged("'logtype': 'file'")
self.assertNotLogged("'logtype': 'journal'")
diff --git a/fail2ban/tests/files/logs/sshd b/fail2ban/tests/files/logs/sshd
index efedb423..a5f64939 100644
--- a/fail2ban/tests/files/logs/sshd
+++ b/fail2ban/tests/files/logs/sshd
@@ -308,6 +308,9 @@ Mar 15 09:21:01 host sshd[2717]: Connection closed by 192.0.2.212 [preauth]
# failJSON: { "time": "2005-03-15T09:21:02", "match": true , "host": "192.0.2.212", "desc": "DDOS mode causes failure on close within preauth stage" }
Mar 15 09:21:02 host sshd[2717]: Connection closed by 192.0.2.212 [preauth]
+# failJSON: { "time": "2005-07-18T17:19:11", "match": true , "host": "192.0.2.4", "desc": "ddos: disconnect on preauth phase, gh-2115" }
+Jul 18 17:19:11 srv sshd[2101]: Disconnected from 192.0.2.4 port 36985 [preauth]
+
# filterOptions: [{"mode": "extra"}, {"mode": "aggressive"}]
# several other cases from gh-864: