mirror of https://github.com/fail2ban/fail2ban
Merge pull request #1576 from sebres/_0.10/fail2ban-regex-coverage
tests of fail2ban-regex extended to cover exec_command_line alsopull/1580/head
commit
77f2dcfdb6
|
@ -40,7 +40,7 @@ from optparse import OptionParser, Option
|
||||||
|
|
||||||
from ConfigParser import NoOptionError, NoSectionError, MissingSectionHeaderError
|
from ConfigParser import NoOptionError, NoSectionError, MissingSectionHeaderError
|
||||||
|
|
||||||
try:
|
try: # pragma: no cover
|
||||||
from systemd import journal
|
from systemd import journal
|
||||||
from ..server.filtersystemd import FilterSystemd
|
from ..server.filtersystemd import FilterSystemd
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
@ -80,7 +80,7 @@ def pprint_list(l, header=None):
|
||||||
s = ''
|
s = ''
|
||||||
output( s + "| " + "\n| ".join(l) + '\n`-' )
|
output( s + "| " + "\n| ".join(l) + '\n`-' )
|
||||||
|
|
||||||
def journal_lines_gen(myjournal):
|
def journal_lines_gen(myjournal): # pragma: no cover
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
entry = myjournal.get_next()
|
entry = myjournal.get_next()
|
||||||
|
@ -136,13 +136,12 @@ Report bugs to https://github.com/fail2ban/fail2ban/issues
|
||||||
"\"systemd-journal\" only"),
|
"\"systemd-journal\" only"),
|
||||||
Option('-l', "--log-level",
|
Option('-l', "--log-level",
|
||||||
dest="log_level",
|
dest="log_level",
|
||||||
default=None,
|
default='critical',
|
||||||
help="Log level for the Fail2Ban logger to use"),
|
help="Log level for the Fail2Ban logger to use"),
|
||||||
Option('-v', '--verbose', action="count", dest="verbose",
|
Option('-v', '--verbose', action="count", dest="verbose",
|
||||||
default=None,
|
default=0,
|
||||||
help="Increase verbosity"),
|
help="Increase verbosity"),
|
||||||
Option("--verbosity", action="store", dest="verbose", type=int,
|
Option("--verbosity", action="store", dest="verbose", type=int,
|
||||||
default=None,
|
|
||||||
help="Set numerical level of verbosity (0..4)"),
|
help="Set numerical level of verbosity (0..4)"),
|
||||||
Option("-D", "--debuggex", action='store_true',
|
Option("-D", "--debuggex", action='store_true',
|
||||||
help="Produce debuggex.com urls for debugging there"),
|
help="Produce debuggex.com urls for debugging there"),
|
||||||
|
@ -343,7 +342,7 @@ class Fail2banRegex(object):
|
||||||
found = True
|
found = True
|
||||||
regex = self._ignoreregex[ret].inc()
|
regex = self._ignoreregex[ret].inc()
|
||||||
except RegexException as e:
|
except RegexException as e:
|
||||||
output( e )
|
output( 'ERROR: %s' % e )
|
||||||
return False
|
return False
|
||||||
return found
|
return found
|
||||||
|
|
||||||
|
@ -360,10 +359,7 @@ class Fail2banRegex(object):
|
||||||
regex.inc()
|
regex.inc()
|
||||||
regex.appendIP(match)
|
regex.appendIP(match)
|
||||||
except RegexException as e:
|
except RegexException as e:
|
||||||
output( e )
|
output( 'ERROR: %s' % e )
|
||||||
return False
|
|
||||||
except IndexError:
|
|
||||||
output( "Sorry, but no <HOST> found in regex" )
|
|
||||||
return False
|
return False
|
||||||
for bufLine in orgLineBuffer[int(fullBuffer):]:
|
for bufLine in orgLineBuffer[int(fullBuffer):]:
|
||||||
if bufLine not in self._filter._Filter__lineBuffer:
|
if bufLine not in self._filter._Filter__lineBuffer:
|
||||||
|
@ -509,10 +505,13 @@ class Fail2banRegex(object):
|
||||||
|
|
||||||
cmd_log, cmd_regex = args[:2]
|
cmd_log, cmd_regex = args[:2]
|
||||||
|
|
||||||
if not self.readRegex(cmd_regex, 'fail'):
|
try:
|
||||||
return False
|
if not self.readRegex(cmd_regex, 'fail'):
|
||||||
|
return False
|
||||||
if len(args) == 3 and not self.readRegex(args[2], 'ignore'):
|
if len(args) == 3 and not self.readRegex(args[2], 'ignore'):
|
||||||
|
return False
|
||||||
|
except RegexException as e:
|
||||||
|
output( 'ERROR: %s' % e )
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if os.path.isfile(cmd_log):
|
if os.path.isfile(cmd_log):
|
||||||
|
@ -556,43 +555,36 @@ class Fail2banRegex(object):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def exec_command_line():
|
def exec_command_line(*args):
|
||||||
parser = get_opt_parser()
|
parser = get_opt_parser()
|
||||||
(opts, args) = parser.parse_args()
|
(opts, args) = parser.parse_args(*args)
|
||||||
if opts.print_no_missed and opts.print_all_missed:
|
errors = []
|
||||||
sys.stderr.write("ERROR: --print-no-missed and --print-all-missed are mutually exclusive.\n\n")
|
if opts.print_no_missed and opts.print_all_missed: # pragma: no cover
|
||||||
parser.print_help()
|
errors.append("ERROR: --print-no-missed and --print-all-missed are mutually exclusive.")
|
||||||
sys.exit(-1)
|
if opts.print_no_ignored and opts.print_all_ignored: # pragma: no cover
|
||||||
if opts.print_no_ignored and opts.print_all_ignored:
|
errors.append("ERROR: --print-no-ignored and --print-all-ignored are mutually exclusive.")
|
||||||
sys.stderr.write("ERROR: --print-no-ignored and --print-all-ignored are mutually exclusive.\n\n")
|
|
||||||
parser.print_help()
|
|
||||||
sys.exit(-1)
|
|
||||||
|
|
||||||
# We need 2 or 3 parameters
|
# We need 2 or 3 parameters
|
||||||
if not len(args) in (2, 3):
|
if not len(args) in (2, 3):
|
||||||
sys.stderr.write("ERROR: provide both <LOG> and <REGEX>.\n\n")
|
errors.append("ERROR: provide both <LOG> and <REGEX>.")
|
||||||
|
if errors:
|
||||||
|
sys.stderr.write("\n".join(errors) + "\n\n")
|
||||||
parser.print_help()
|
parser.print_help()
|
||||||
return False
|
sys.exit(-1)
|
||||||
|
|
||||||
output( "" )
|
output( "" )
|
||||||
output( "Running tests" )
|
output( "Running tests" )
|
||||||
output( "=============" )
|
output( "=============" )
|
||||||
output( "" )
|
output( "" )
|
||||||
|
|
||||||
# TODO: taken from -testcases -- move common functionality somewhere
|
# Log level (default critical):
|
||||||
if opts.log_level is not None:
|
opts.log_level = str2LogLevel(opts.log_level)
|
||||||
# so we had explicit settings
|
logSys.setLevel(opts.log_level)
|
||||||
logSys.setLevel(str2LogLevel(opts.log_level))
|
|
||||||
else:
|
|
||||||
# suppress the logging but it would leave unittests' progress dots
|
|
||||||
# ticking, unless like with '-l critical' which would be silent
|
|
||||||
# unless error occurs
|
|
||||||
logSys.setLevel(logging.CRITICAL)
|
|
||||||
|
|
||||||
# Add the default logging handler
|
# Add the default logging handler
|
||||||
stdout = logging.StreamHandler(sys.stdout)
|
stdout = logging.StreamHandler(sys.stdout)
|
||||||
|
|
||||||
fmt = '%(levelname)-1.1s: %(message)s' if opts.verbose <= 1 else '%(message)s'
|
fmt = '%(levelname)-1.1s: %(message)s' if opts.verbose <= 1 else ' %(message)s'
|
||||||
|
|
||||||
if opts.log_traceback:
|
if opts.log_traceback:
|
||||||
Formatter = FormatterWithTraceBack
|
Formatter = FormatterWithTraceBack
|
||||||
|
|
|
@ -24,9 +24,10 @@ __copyright__ = "Copyright (c) 2015 Serg G. Brester (sebres), 2008- Fail2Ban Con
|
||||||
__license__ = "GPL"
|
__license__ = "GPL"
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
from ..client import fail2banregex
|
from ..client import fail2banregex
|
||||||
from ..client.fail2banregex import Fail2banRegex, get_opt_parser, output
|
from ..client.fail2banregex import Fail2banRegex, get_opt_parser, exec_command_line, output
|
||||||
from .utils import setUpMyTime, tearDownMyTime, LogCaptureTestCase, logSys
|
from .utils import setUpMyTime, tearDownMyTime, LogCaptureTestCase, logSys
|
||||||
from .utils import CONFIG_DIR
|
from .utils import CONFIG_DIR
|
||||||
|
|
||||||
|
@ -39,12 +40,38 @@ fail2banregex.output = _test_output
|
||||||
|
|
||||||
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
|
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
|
||||||
|
|
||||||
|
DEV_NULL = None
|
||||||
|
|
||||||
def _Fail2banRegex(*args):
|
def _Fail2banRegex(*args):
|
||||||
parser = get_opt_parser()
|
parser = get_opt_parser()
|
||||||
(opts, args) = parser.parse_args(list(args))
|
(opts, args) = parser.parse_args(list(args))
|
||||||
return (opts, args, Fail2banRegex(opts))
|
return (opts, args, Fail2banRegex(opts))
|
||||||
|
|
||||||
|
class ExitException(Exception):
|
||||||
|
def __init__(self, code):
|
||||||
|
self.code = code
|
||||||
|
self.msg = 'Exit with code: %s' % code
|
||||||
|
|
||||||
|
def _test_exec_command_line(*args):
|
||||||
|
def _exit(code=0):
|
||||||
|
raise ExitException(code)
|
||||||
|
global DEV_NULL
|
||||||
|
_org = {'exit': sys.exit, 'stdout': sys.stdout, 'stderr': sys.stderr}
|
||||||
|
_exit_code = 0
|
||||||
|
sys.exit = _exit
|
||||||
|
if not DEV_NULL: DEV_NULL = open(os.devnull, "w")
|
||||||
|
sys.stderr = sys.stdout = DEV_NULL
|
||||||
|
try:
|
||||||
|
exec_command_line(list(args))
|
||||||
|
except ExitException as e:
|
||||||
|
_exit_code = e.code
|
||||||
|
finally:
|
||||||
|
sys.exit = _org['exit']
|
||||||
|
sys.stdout = _org['stdout']
|
||||||
|
sys.stderr = _org['stderr']
|
||||||
|
return _exit_code
|
||||||
|
|
||||||
|
|
||||||
class Fail2banRegexTest(LogCaptureTestCase):
|
class Fail2banRegexTest(LogCaptureTestCase):
|
||||||
|
|
||||||
RE_00 = r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>"
|
RE_00 = r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>"
|
||||||
|
@ -69,14 +96,14 @@ class Fail2banRegexTest(LogCaptureTestCase):
|
||||||
(opts, args, fail2banRegex) = _Fail2banRegex(
|
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||||
"test", r".** from <HOST>$"
|
"test", r".** from <HOST>$"
|
||||||
)
|
)
|
||||||
self.assertRaises(Exception, lambda: fail2banRegex.start(opts, args))
|
self.assertFalse(fail2banRegex.start(opts, args))
|
||||||
self.assertLogged("Unable to compile regular expression")
|
self.assertLogged("Unable to compile regular expression")
|
||||||
|
|
||||||
def testWrongIngnoreRE(self):
|
def testWrongIngnoreRE(self):
|
||||||
(opts, args, fail2banRegex) = _Fail2banRegex(
|
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||||
"test", r".*? from <HOST>$", r".**"
|
"test", r".*? from <HOST>$", r".**"
|
||||||
)
|
)
|
||||||
self.assertRaises(Exception, lambda: fail2banRegex.start(opts, args))
|
self.assertFalse(fail2banRegex.start(opts, args))
|
||||||
self.assertLogged("Unable to compile regular expression")
|
self.assertLogged("Unable to compile regular expression")
|
||||||
|
|
||||||
def testDirectFound(self):
|
def testDirectFound(self):
|
||||||
|
@ -184,4 +211,21 @@ class Fail2banRegexTest(LogCaptureTestCase):
|
||||||
|
|
||||||
self.assertLogged('https://')
|
self.assertLogged('https://')
|
||||||
|
|
||||||
|
def testExecCmdLine_Usage(self):
|
||||||
|
self.assertNotEqual(_test_exec_command_line(), 0)
|
||||||
|
|
||||||
|
def testExecCmdLine_Direct(self):
|
||||||
|
self.assertEqual(_test_exec_command_line(
|
||||||
|
'-l', 'info',
|
||||||
|
"Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
|
||||||
|
r"Authentication failure for .*? from <HOST>$"
|
||||||
|
), 0)
|
||||||
|
self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed')
|
||||||
|
|
||||||
|
def testExecCmdLine_MissFailID(self):
|
||||||
|
self.assertNotEqual(_test_exec_command_line(
|
||||||
|
'-l', 'info',
|
||||||
|
"Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
|
||||||
|
r"Authentication failure"
|
||||||
|
), 0)
|
||||||
|
self.assertLogged('No failure-id group in ')
|
||||||
|
|
Loading…
Reference in New Issue