test coverage

pull/1698/head
sebres 8 years ago
parent d2a3d093c6
commit e4a265c75f

@ -61,7 +61,7 @@ def debuggexURL(sample, regex):
'flavor': 'python' })
return 'https://www.debuggex.com/?' + q
def output(args):
def output(args): # pragma: no cover (overriden in test-cases)
print(args)
def shortstr(s, l=53):
@ -278,15 +278,15 @@ class Fail2banRegex(object):
value = os.path.splitext(os.path.basename(value))[0]
output( "Use %11s filter file : %s, basedir: %s" % (regex, value, basedir) )
reader = FilterReader(value, 'fail2ban-regex-jail', {}, share_config=self.share_config, basedir=basedir)
if not reader.read():
if not reader.read(): # pragma: no cover
output( "ERROR: failed to load filter %s" % value )
return False
else:
else: # pragma: no cover
## foreign file - readexplicit this file and includes if possible:
output( "Use %11s file : %s" % (regex, value) )
reader = FilterReader(value, 'fail2ban-regex-jail', {}, share_config=self.share_config)
reader.setBaseDir(None)
if not reader.readexplicit():
if not reader.readexplicit():
output( "ERROR: failed to read %s" % value )
return False
reader.getOptions(None)
@ -298,7 +298,7 @@ class Fail2banRegex(object):
optval = opt[3]
elif opt[0] == 'set':
optval = opt[3:]
else:
else: # pragma: no cover
continue
try:
if opt[2] == "prefregex":
@ -322,7 +322,7 @@ class Fail2banRegex(object):
elif opt[2] == "datepattern":
for optval in optval:
self.setDatePattern(optval)
elif opt[2] == "addjournalmatch":
elif opt[2] == "addjournalmatch": # pragma: no cover
if self._opts.journalmatch is None:
self.setJournalMatch(optval)
except ValueError as e: # pragma: no cover
@ -350,7 +350,7 @@ class Fail2banRegex(object):
if ret is not None:
found = True
regex = self._ignoreregex[ret].inc()
except RegexException as e:
except RegexException as e: # pragma: no cover
output( 'ERROR: %s' % e )
return False
return found
@ -368,7 +368,7 @@ class Fail2banRegex(object):
regex = self._failregex[match[0]]
regex.inc()
regex.appendIP(match)
except RegexException as e:
except RegexException as e: # pragma: no cover
output( 'ERROR: %s' % e )
return False
for bufLine in orgLineBuffer[int(fullBuffer):]:
@ -520,9 +520,9 @@ class Fail2banRegex(object):
cmd_log, cmd_regex = args[:2]
try:
if not self.readRegex(cmd_regex, 'fail'):
if not self.readRegex(cmd_regex, 'fail'): # pragma: no cover
return False
if len(args) == 3 and not self.readRegex(args[2], 'ignore'):
if len(args) == 3 and not self.readRegex(args[2], 'ignore'): # pragma: no cover
return False
except RegexException as e:
output( 'ERROR: %s' % e )
@ -534,7 +534,7 @@ class Fail2banRegex(object):
output( "Use log file : %s" % cmd_log )
output( "Use encoding : %s" % self._encoding )
test_lines = self.file_lines_gen(hdlr)
except IOError as e:
except IOError as e: # pragma: no cover
output( e )
return False
elif cmd_log.startswith("systemd-journal"): # pragma: no cover

@ -0,0 +1,76 @@
# Fail2Ban obsolete multiline example resp. test filter (previously sshd.conf)
#
[INCLUDES]
# Read common prefixes. If any customizations available -- read them from
# common.local
before = ../../../../config/filter.d/common.conf
[DEFAULT]
_daemon = sshd
# optional prefix (logged from several ssh versions) like "error: ", "error: PAM: " or "fatal: "
__pref = (?:(?:error|fatal): (?:PAM: )?)?
# optional suffix (logged from several ssh versions) like " [preauth]"
__suff = (?: \[preauth\])?\s*
__on_port_opt = (?: port \d+)?(?: on \S+(?: port \d+)?)?
# single line prefix:
__prefix_line_sl = %(__prefix_line)s%(__pref)s
# multi line prefixes (for first and second lines):
__prefix_line_ml1 = (?P<__prefix>%(__prefix_line)s)%(__pref)s
__prefix_line_ml2 = %(__suff)s$<SKIPLINES>^(?P=__prefix)%(__pref)s
mode = %(normal)s
normal = ^%(__prefix_line_sl)s[aA]uthentication (?:failure|error|failed) for .* from <HOST>( via \S+)?\s*%(__suff)s$
^%(__prefix_line_sl)sUser not known to the underlying authentication module for .* from <HOST>\s*%(__suff)s$
^%(__prefix_line_sl)sFailed \S+ for (?P<cond_inv>invalid user )?(?P<user>(?P<cond_user>\S+)|(?(cond_inv)(?:(?! from ).)*?|[^:]+)) from <HOST>%(__on_port_opt)s(?: ssh\d*)?(?(cond_user): |(?:(?:(?! from ).)*)$)
^%(__prefix_line_sl)sROOT LOGIN REFUSED.* FROM <HOST>\s*%(__suff)s$
^%(__prefix_line_sl)s[iI](?:llegal|nvalid) user .*? from <HOST>%(__on_port_opt)s\s*$
^%(__prefix_line_sl)sUser .+ from <HOST> not allowed because not listed in AllowUsers\s*%(__suff)s$
^%(__prefix_line_sl)sUser .+ from <HOST> not allowed because listed in DenyUsers\s*%(__suff)s$
^%(__prefix_line_sl)sUser .+ from <HOST> not allowed because not in any group\s*%(__suff)s$
^%(__prefix_line_sl)srefused connect from \S+ \(<HOST>\)\s*%(__suff)s$
^%(__prefix_line_sl)sReceived disconnect from <HOST>%(__on_port_opt)s:\s*3: .*: Auth fail%(__suff)s$
^%(__prefix_line_sl)sUser .+ from <HOST> not allowed because a group is listed in DenyGroups\s*%(__suff)s$
^%(__prefix_line_sl)sUser .+ from <HOST> not allowed because none of user's groups are listed in AllowGroups\s*%(__suff)s$
^%(__prefix_line_sl)spam_unix\(sshd:auth\):\s+authentication failure;\s*logname=\S*\s*uid=\d*\s*euid=\d*\s*tty=\S*\s*ruser=\S*\s*rhost=<HOST>\s.*%(__suff)s$
^%(__prefix_line_sl)s(error: )?maximum authentication attempts exceeded for .* from <HOST>%(__on_port_opt)s(?: ssh\d*)? \[preauth\]$
^%(__prefix_line_ml1)sUser .+ not allowed because account is locked%(__prefix_line_ml2)sReceived disconnect from <HOST>: 11: .+%(__suff)s$
^%(__prefix_line_ml1)sDisconnecting: Too many authentication failures for .+?%(__prefix_line_ml2)sConnection closed by <HOST>%(__suff)s$
^%(__prefix_line_ml1)sConnection from <HOST>%(__on_port_opt)s%(__prefix_line_ml2)sDisconnecting: Too many authentication failures for .+%(__suff)s$
ddos = ^%(__prefix_line_sl)sDid not receive identification string from <HOST>%(__suff)s$
^%(__prefix_line_sl)sReceived disconnect from <HOST>%(__on_port_opt)s:\s*14: No supported authentication methods available%(__suff)s$
^%(__prefix_line_sl)sUnable to negotiate with <HOST>%(__on_port_opt)s: no matching (?:cipher|key exchange method) found.
^%(__prefix_line_ml1)sConnection from <HOST>%(__on_port_opt)s%(__prefix_line_ml2)sUnable to negotiate a (?:cipher|key exchange method)%(__suff)s$
^%(__prefix_line_ml1)sSSH: Server;Ltype: (?:Authname|Version|Kex);Remote: <HOST>-\d+;[A-Z]\w+:.*%(__prefix_line_ml2)sRead from socket failed: Connection reset by peer%(__suff)s$
aggressive = %(normal)s
%(ddos)s
[Definition]
failregex = %(mode)s
ignoreregex =
# "maxlines" is number of log lines to buffer for multi-line regex searches
maxlines = 10
journalmatch = _SYSTEMD_UNIT=sshd.service + _COMM=sshd
datepattern = {^LN-BEG}
# DEV Notes:
#
# "Failed \S+ for .*? from <HOST>..." failregex uses non-greedy catch-all because
# it is coming before use of <HOST> which is not hard-anchored at the end as well,
# and later catch-all's could contain user-provided input, which need to be greedily
# matched away first.
#
# Author: Cyril Jaquier, Yaroslav Halchenko, Petr Voralek, Daniel Black

@ -38,6 +38,7 @@ def _test_output(*args):
fail2banregex.output = _test_output
TEST_CONFIG_DIR = os.path.join(os.path.dirname(__file__), "config")
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
DEV_NULL = None
@ -85,6 +86,11 @@ class Fail2banRegexTest(LogCaptureTestCase):
FILENAME_SSHD = os.path.join(TEST_FILES_DIR, "logs", "sshd")
FILTER_SSHD = os.path.join(CONFIG_DIR, 'filter.d', 'sshd.conf')
FILENAME_ZZZ_SSHD = os.path.join(TEST_FILES_DIR, 'zzz-sshd-obsolete-multiline.log')
FILTER_ZZZ_SSHD = os.path.join(TEST_CONFIG_DIR, 'filter.d', 'zzz-sshd-obsolete-multiline.conf')
FILENAME_ZZZ_GEN = os.path.join(TEST_FILES_DIR, "logs", "zzz-generic-example")
FILTER_ZZZ_GEN = os.path.join(TEST_CONFIG_DIR, 'filter.d', 'zzz-generic-example.conf')
def setUp(self):
"""Call before every test case."""
@ -210,6 +216,39 @@ class Fail2banRegexTest(LogCaptureTestCase):
self.assertLogged("[29116]: User root not allowed because account is locked",
"[29116]: Received disconnect from 1.2.3.4", all=True)
def testFastSshd(self):
(opts, args, fail2banRegex) = _Fail2banRegex(
"-l", "notice", # put down log-level, because of too many debug-messages
"--print-all-matched",
Fail2banRegexTest.FILENAME_ZZZ_SSHD, Fail2banRegexTest.FILTER_SSHD
)
self.assertTrue(fail2banRegex.start(args))
# test failure line and all not-failure lines presents:
self.assertLogged(
"[29116]: Connection from 192.0.2.4",
"[29116]: User root not allowed because account is locked",
"[29116]: Received disconnect from 192.0.2.4", all=True)
def testMultilineSshd(self):
# by the way test of missing lines by multiline in `for bufLine in orgLineBuffer[int(fullBuffer):]`
(opts, args, fail2banRegex) = _Fail2banRegex(
"-l", "notice", # put down log-level, because of too many debug-messages
"--print-all-matched", "--print-all-missed",
Fail2banRegexTest.FILENAME_ZZZ_SSHD, Fail2banRegexTest.FILTER_ZZZ_SSHD
)
self.assertTrue(fail2banRegex.start(args))
# test "failure" line presents (2nd part only, because multiline fewer precise):
self.assertLogged(
"[29116]: Received disconnect from 192.0.2.4", all=True)
def testFullGeneric(self):
# by the way test of ignoreregex (specified in filter file)...
(opts, args, fail2banRegex) = _Fail2banRegex(
"-l", "notice", # put down log-level, because of too many debug-messages
Fail2banRegexTest.FILENAME_ZZZ_GEN, Fail2banRegexTest.FILTER_ZZZ_GEN
)
self.assertTrue(fail2banRegex.start(args))
def _reset(self):
# reset global warn-counter:
from ..server.filter import _decode_line_warn

@ -0,0 +1,2 @@
# test sshd file:
# addFILE: "sshd"

@ -0,0 +1,4 @@
Apr 27 13:02:01 host sshd[29116]: Connection from 192.0.2.4 port 55555
Apr 27 13:02:02 host sshd[29116]: User root not allowed because account is locked
Apr 27 13:02:03 host sshd[29116]: input_userauth_request: invalid user root [preauth]
Apr 27 13:02:04 host sshd[29116]: Received disconnect from 192.0.2.4: 11: Normal Shutdown, Thank you for playing [preauth]
Loading…
Cancel
Save