From 7e8575cc5609f4ebd8a2503bc79ebac078becddd Mon Sep 17 00:00:00 2001 From: sebres Date: Thu, 13 Oct 2016 17:09:49 +0200 Subject: [PATCH 1/2] tests of fail2ban-regex extended to cover exec_command_line also; Closes #1573 --- fail2ban/client/fail2banregex.py | 25 ++++++---------- fail2ban/tests/fail2banregextestcase.py | 38 ++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 17 deletions(-) diff --git a/fail2ban/client/fail2banregex.py b/fail2ban/client/fail2banregex.py index c05006f7..2d45c31d 100644 --- a/fail2ban/client/fail2banregex.py +++ b/fail2ban/client/fail2banregex.py @@ -136,13 +136,12 @@ Report bugs to https://github.com/fail2ban/fail2ban/issues "\"systemd-journal\" only"), Option('-l', "--log-level", dest="log_level", - default=None, + default='critical', help="Log level for the Fail2Ban logger to use"), Option('-v', '--verbose', action="count", dest="verbose", - default=None, + default=0, help="Increase verbosity"), Option("--verbosity", action="store", dest="verbose", type=int, - default=None, help="Set numerical level of verbosity (0..4)"), Option("-D", "--debuggex", action='store_true', help="Produce debuggex.com urls for debugging there"), @@ -556,9 +555,9 @@ class Fail2banRegex(object): return True -def exec_command_line(): +def exec_command_line(*args): parser = get_opt_parser() - (opts, args) = parser.parse_args() + (opts, args) = parser.parse_args(*args) if opts.print_no_missed and opts.print_all_missed: sys.stderr.write("ERROR: --print-no-missed and --print-all-missed are mutually exclusive.\n\n") parser.print_help() @@ -572,27 +571,21 @@ def exec_command_line(): if not len(args) in (2, 3): sys.stderr.write("ERROR: provide both and .\n\n") parser.print_help() - return False + sys.exit(-1) output( "" ) output( "Running tests" ) output( "=============" ) output( "" ) - # TODO: taken from -testcases -- move common functionality somewhere - if opts.log_level is not None: - # so we had explicit settings - logSys.setLevel(str2LogLevel(opts.log_level)) - else: - # suppress the logging but it would leave unittests' progress dots - # ticking, unless like with '-l critical' which would be silent - # unless error occurs - logSys.setLevel(logging.CRITICAL) + # Log level (default critical): + opts.log_level = str2LogLevel(opts.log_level) + logSys.setLevel(opts.log_level) # Add the default logging handler stdout = logging.StreamHandler(sys.stdout) - fmt = '%(levelname)-1.1s: %(message)s' if opts.verbose <= 1 else '%(message)s' + fmt = '%(levelname)-1.1s: %(message)s' if opts.verbose <= 1 else ' %(message)s' if opts.log_traceback: Formatter = FormatterWithTraceBack diff --git a/fail2ban/tests/fail2banregextestcase.py b/fail2ban/tests/fail2banregextestcase.py index 0154e5af..a3d3cfb5 100644 --- a/fail2ban/tests/fail2banregextestcase.py +++ b/fail2ban/tests/fail2banregextestcase.py @@ -24,9 +24,10 @@ __copyright__ = "Copyright (c) 2015 Serg G. Brester (sebres), 2008- Fail2Ban Con __license__ = "GPL" import os +import sys from ..client import fail2banregex -from ..client.fail2banregex import Fail2banRegex, get_opt_parser, output +from ..client.fail2banregex import Fail2banRegex, get_opt_parser, exec_command_line, output from .utils import setUpMyTime, tearDownMyTime, LogCaptureTestCase, logSys from .utils import CONFIG_DIR @@ -39,12 +40,38 @@ fail2banregex.output = _test_output TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files") +DEV_NULL = None def _Fail2banRegex(*args): parser = get_opt_parser() (opts, args) = parser.parse_args(list(args)) return (opts, args, Fail2banRegex(opts)) +class ExitException(Exception): + def __init__(self, code): + self.code = code + self.msg = 'Exit with code: %s' % code + +def _test_exec_command_line(*args): + def _exit(code=0): + raise ExitException(code) + global DEV_NULL + _org = {'exit': sys.exit, 'stdout': sys.stdout, 'stderr': sys.stderr} + _exit_code = 0 + sys.exit = _exit + if not DEV_NULL: DEV_NULL = open(os.devnull, "w") + sys.stderr = sys.stdout = DEV_NULL + try: + exec_command_line(list(args)) + except ExitException as e: + _exit_code = e.code + finally: + sys.exit = _org['exit'] + sys.stdout = _org['stdout'] + sys.stderr = _org['stderr'] + return _exit_code + + class Fail2banRegexTest(LogCaptureTestCase): RE_00 = r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) " @@ -184,4 +211,13 @@ class Fail2banRegexTest(LogCaptureTestCase): self.assertLogged('https://') + def testExecCmdLine_Usage(self): + self.assertNotEqual(_test_exec_command_line(), 0) + def testExecCmdLine_Direct(self): + self.assertEqual(_test_exec_command_line( + '-l', 'info', + "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0", + r"Authentication failure for .*? from $" + ), 0) + self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed') From 44f93bfbff0adbcf8e85b6f1b6be315d166cb063 Mon Sep 17 00:00:00 2001 From: sebres Date: Thu, 13 Oct 2016 18:24:55 +0200 Subject: [PATCH 2/2] increase coverage, better test and output of errors --- fail2ban/client/fail2banregex.py | 39 ++++++++++++------------- fail2ban/tests/fail2banregextestcase.py | 12 ++++++-- 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/fail2ban/client/fail2banregex.py b/fail2ban/client/fail2banregex.py index 2d45c31d..4096ac4e 100644 --- a/fail2ban/client/fail2banregex.py +++ b/fail2ban/client/fail2banregex.py @@ -40,7 +40,7 @@ from optparse import OptionParser, Option from ConfigParser import NoOptionError, NoSectionError, MissingSectionHeaderError -try: +try: # pragma: no cover from systemd import journal from ..server.filtersystemd import FilterSystemd except ImportError: @@ -80,7 +80,7 @@ def pprint_list(l, header=None): s = '' output( s + "| " + "\n| ".join(l) + '\n`-' ) -def journal_lines_gen(myjournal): +def journal_lines_gen(myjournal): # pragma: no cover while True: try: entry = myjournal.get_next() @@ -342,7 +342,7 @@ class Fail2banRegex(object): found = True regex = self._ignoreregex[ret].inc() except RegexException as e: - output( e ) + output( 'ERROR: %s' % e ) return False return found @@ -359,10 +359,7 @@ class Fail2banRegex(object): regex.inc() regex.appendIP(match) except RegexException as e: - output( e ) - return False - except IndexError: - output( "Sorry, but no found in regex" ) + output( 'ERROR: %s' % e ) return False for bufLine in orgLineBuffer[int(fullBuffer):]: if bufLine not in self._filter._Filter__lineBuffer: @@ -508,10 +505,13 @@ class Fail2banRegex(object): cmd_log, cmd_regex = args[:2] - if not self.readRegex(cmd_regex, 'fail'): - return False - - if len(args) == 3 and not self.readRegex(args[2], 'ignore'): + try: + if not self.readRegex(cmd_regex, 'fail'): + return False + if len(args) == 3 and not self.readRegex(args[2], 'ignore'): + return False + except RegexException as e: + output( 'ERROR: %s' % e ) return False if os.path.isfile(cmd_log): @@ -558,18 +558,17 @@ class Fail2banRegex(object): def exec_command_line(*args): parser = get_opt_parser() (opts, args) = parser.parse_args(*args) - if opts.print_no_missed and opts.print_all_missed: - sys.stderr.write("ERROR: --print-no-missed and --print-all-missed are mutually exclusive.\n\n") - parser.print_help() - sys.exit(-1) - if opts.print_no_ignored and opts.print_all_ignored: - sys.stderr.write("ERROR: --print-no-ignored and --print-all-ignored are mutually exclusive.\n\n") - parser.print_help() - sys.exit(-1) + errors = [] + if opts.print_no_missed and opts.print_all_missed: # pragma: no cover + errors.append("ERROR: --print-no-missed and --print-all-missed are mutually exclusive.") + if opts.print_no_ignored and opts.print_all_ignored: # pragma: no cover + errors.append("ERROR: --print-no-ignored and --print-all-ignored are mutually exclusive.") # We need 2 or 3 parameters if not len(args) in (2, 3): - sys.stderr.write("ERROR: provide both and .\n\n") + errors.append("ERROR: provide both and .") + if errors: + sys.stderr.write("\n".join(errors) + "\n\n") parser.print_help() sys.exit(-1) diff --git a/fail2ban/tests/fail2banregextestcase.py b/fail2ban/tests/fail2banregextestcase.py index a3d3cfb5..4445fe6b 100644 --- a/fail2ban/tests/fail2banregextestcase.py +++ b/fail2ban/tests/fail2banregextestcase.py @@ -96,14 +96,14 @@ class Fail2banRegexTest(LogCaptureTestCase): (opts, args, fail2banRegex) = _Fail2banRegex( "test", r".** from $" ) - self.assertRaises(Exception, lambda: fail2banRegex.start(opts, args)) + self.assertFalse(fail2banRegex.start(opts, args)) self.assertLogged("Unable to compile regular expression") def testWrongIngnoreRE(self): (opts, args, fail2banRegex) = _Fail2banRegex( "test", r".*? from $", r".**" ) - self.assertRaises(Exception, lambda: fail2banRegex.start(opts, args)) + self.assertFalse(fail2banRegex.start(opts, args)) self.assertLogged("Unable to compile regular expression") def testDirectFound(self): @@ -221,3 +221,11 @@ class Fail2banRegexTest(LogCaptureTestCase): r"Authentication failure for .*? from $" ), 0) self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed') + + def testExecCmdLine_MissFailID(self): + self.assertNotEqual(_test_exec_command_line( + '-l', 'info', + "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0", + r"Authentication failure" + ), 0) + self.assertLogged('No failure-id group in ')