mirror of https://github.com/fail2ban/fail2ban
fail2ban-regex moved to the client + test cases for initial coverage added
parent
46b116e86a
commit
0877d66228
|
@ -44,16 +44,16 @@ from ConfigParser import NoOptionError, NoSectionError, MissingSectionHeaderErro
|
|||
|
||||
try:
|
||||
from systemd import journal
|
||||
from fail2ban.server.filtersystemd import FilterSystemd
|
||||
from ..server.filtersystemd import FilterSystemd
|
||||
except ImportError:
|
||||
journal = None
|
||||
|
||||
from fail2ban.version import version
|
||||
from fail2ban.client.filterreader import FilterReader
|
||||
from fail2ban.server.filter import Filter
|
||||
from fail2ban.server.failregex import RegexException
|
||||
from ..version import version
|
||||
from .filterreader import FilterReader
|
||||
from ..server.filter import Filter, FileContainer
|
||||
from ..server.failregex import RegexException
|
||||
|
||||
from fail2ban.helpers import FormatterWithTraceBack, getLogger
|
||||
from ..helpers import FormatterWithTraceBack, getLogger
|
||||
# Gets the instance of the logger.
|
||||
logSys = getLogger("fail2ban")
|
||||
|
||||
|
@ -63,6 +63,9 @@ def debuggexURL(sample, regex):
|
|||
'flavor': 'python' })
|
||||
return 'http://www.debuggex.com/?' + q
|
||||
|
||||
def output(args):
|
||||
print(args)
|
||||
|
||||
def shortstr(s, l=53):
|
||||
"""Return shortened string
|
||||
"""
|
||||
|
@ -77,22 +80,7 @@ def pprint_list(l, header=None):
|
|||
s = "|- %s\n" % header
|
||||
else:
|
||||
s = ''
|
||||
print s + "| " + "\n| ".join(l) + '\n`-'
|
||||
|
||||
def file_lines_gen(hdlr):
|
||||
for line in hdlr:
|
||||
try:
|
||||
line = line.decode(fail2banRegex.encoding, 'strict')
|
||||
except UnicodeDecodeError:
|
||||
logSys.warning(
|
||||
"Error decoding line from '%s' with '%s'."
|
||||
" Consider setting logencoding=utf-8 (or another appropriate"
|
||||
" encoding) for this jail. Continuing"
|
||||
" to process line ignoring invalid characters: %r" %
|
||||
('<LOG>', fail2banRegex.encoding, line))
|
||||
# decode with replacing error chars:
|
||||
line = line.decode(fail2banRegex.encoding, 'replace')
|
||||
yield line
|
||||
output( s + "| " + "\n| ".join(l) + '\n`-' )
|
||||
|
||||
def journal_lines_gen(myjournal):
|
||||
while True:
|
||||
|
@ -259,14 +247,14 @@ class Fail2banRegex(object):
|
|||
self._filter.setDatePattern(pattern)
|
||||
self._datepattern_set = True
|
||||
if pattern is not None:
|
||||
print "Use datepattern : %s" % (
|
||||
self._filter.getDatePattern()[1], )
|
||||
output( "Use datepattern : %s" % (
|
||||
self._filter.getDatePattern()[1], ) )
|
||||
|
||||
def setMaxLines(self, v):
|
||||
if not self._maxlines_set:
|
||||
self._filter.setMaxLines(int(v))
|
||||
self._maxlines_set = True
|
||||
print "Use maxlines : %d" % self._filter.getMaxLines()
|
||||
output( "Use maxlines : %d" % self._filter.getMaxLines() )
|
||||
|
||||
def setJournalMatch(self, v):
|
||||
if self._journalmatch is None:
|
||||
|
@ -280,18 +268,18 @@ class Fail2banRegex(object):
|
|||
## within filter.d folder - use standard loading algorithm to load filter completely (with .local etc.):
|
||||
basedir = os.path.dirname(os.path.dirname(value))
|
||||
value = os.path.splitext(os.path.basename(value))[0]
|
||||
print "Use %11s filter file : %s, basedir: %s" % (regex, value, basedir)
|
||||
output( "Use %11s filter file : %s, basedir: %s" % (regex, value, basedir) )
|
||||
reader = FilterReader(value, 'fail2ban-regex-jail', {}, share_config=self.share_config, basedir=basedir)
|
||||
if not reader.read():
|
||||
print "ERROR: failed to load filter %s" % value
|
||||
output( "ERROR: failed to load filter %s" % value )
|
||||
return False
|
||||
else:
|
||||
## foreign file - readexplicit this file and includes if possible:
|
||||
print "Use %11s file : %s" % (regex, value)
|
||||
output( "Use %11s file : %s" % (regex, value) )
|
||||
reader = FilterReader(value, 'fail2ban-regex-jail', {}, share_config=self.share_config)
|
||||
reader.setBaseDir(None)
|
||||
if not reader.readexplicit():
|
||||
print "ERROR: failed to read %s" % value
|
||||
output( "ERROR: failed to read %s" % value )
|
||||
return False
|
||||
reader.getOptions(None)
|
||||
readercommands = reader.convert()
|
||||
|
@ -307,8 +295,8 @@ class Fail2banRegex(object):
|
|||
try:
|
||||
self.setMaxLines(maxlines)
|
||||
except ValueError:
|
||||
print "ERROR: Invalid value for maxlines (%(maxlines)r) " \
|
||||
"read from %(value)s" % locals()
|
||||
output( "ERROR: Invalid value for maxlines (%(maxlines)r) " \
|
||||
"read from %(value)s" % locals() )
|
||||
return False
|
||||
elif command[2] == 'addjournalmatch':
|
||||
journalmatch = command[3:]
|
||||
|
@ -317,7 +305,7 @@ class Fail2banRegex(object):
|
|||
datepattern = command[3]
|
||||
self.setDatePattern(datepattern)
|
||||
else:
|
||||
print "Use %11s line : %s" % (regex, shortstr(value))
|
||||
output( "Use %11s line : %s" % (regex, shortstr(value)) )
|
||||
regex_values = [RegexStat(value)]
|
||||
|
||||
setattr(self, "_" + regex, regex_values)
|
||||
|
@ -335,7 +323,7 @@ class Fail2banRegex(object):
|
|||
found = True
|
||||
regex = self._ignoreregex[ret].inc()
|
||||
except RegexException, e:
|
||||
print e
|
||||
output( e )
|
||||
return False
|
||||
return found
|
||||
|
||||
|
@ -352,10 +340,10 @@ class Fail2banRegex(object):
|
|||
regex.inc()
|
||||
regex.appendIP(match)
|
||||
except RegexException, e:
|
||||
print e
|
||||
output( e )
|
||||
return False
|
||||
except IndexError:
|
||||
print "Sorry, but no <HOST> found in regex"
|
||||
output( "Sorry, but no <HOST> found in regex" )
|
||||
return False
|
||||
for bufLine in orgLineBuffer[int(fullBuffer):]:
|
||||
if bufLine not in self._filter._Filter__lineBuffer:
|
||||
|
@ -376,7 +364,7 @@ class Fail2banRegex(object):
|
|||
t0 = time.time()
|
||||
for line_no, line in enumerate(test_lines):
|
||||
if isinstance(line, tuple):
|
||||
line_datetimestripped, ret = fail2banRegex.testRegex(
|
||||
line_datetimestripped, ret = self.testRegex(
|
||||
line[0], line[1])
|
||||
line = "".join(line[0])
|
||||
else:
|
||||
|
@ -384,8 +372,8 @@ class Fail2banRegex(object):
|
|||
if line.startswith('#') or not line:
|
||||
# skip comment and empty lines
|
||||
continue
|
||||
line_datetimestripped, ret = fail2banRegex.testRegex(line)
|
||||
is_ignored = fail2banRegex.testIgnoreRegex(line_datetimestripped)
|
||||
line_datetimestripped, ret = self.testRegex(line)
|
||||
is_ignored = self.testIgnoreRegex(line_datetimestripped)
|
||||
|
||||
if is_ignored:
|
||||
self._line_stats.ignored += 1
|
||||
|
@ -432,18 +420,18 @@ class Fail2banRegex(object):
|
|||
b = map(lambda a: a[0] + ' | ' + a[1].getFailRegex() + ' | ' + debuggexURL(a[0], a[1].getFailRegex()), ans)
|
||||
pprint_list([x.rstrip() for x in b], header)
|
||||
else:
|
||||
print "%s too many to print. Use --print-all-%s " \
|
||||
"to print all %d lines" % (header, ltype, lines)
|
||||
output( "%s too many to print. Use --print-all-%s " \
|
||||
"to print all %d lines" % (header, ltype, lines) )
|
||||
elif lines < self._maxlines or getattr(self, '_print_all_' + ltype):
|
||||
pprint_list([x.rstrip() for x in l], header)
|
||||
else:
|
||||
print "%s too many to print. Use --print-all-%s " \
|
||||
"to print all %d lines" % (header, ltype, lines)
|
||||
output( "%s too many to print. Use --print-all-%s " \
|
||||
"to print all %d lines" % (header, ltype, lines) )
|
||||
|
||||
def printStats(self):
|
||||
print
|
||||
print "Results"
|
||||
print "======="
|
||||
output( "" )
|
||||
output( "Results" )
|
||||
output( "=======" )
|
||||
|
||||
def print_failregexes(title, failregexes):
|
||||
# Print title
|
||||
|
@ -464,7 +452,7 @@ class Fail2banRegex(object):
|
|||
timeString,
|
||||
ip[-1] and " (multiple regex matched)" or ""))
|
||||
|
||||
print "\n%s: %d total" % (title, total)
|
||||
output( "\n%s: %d total" % (title, total) )
|
||||
pprint_list(out, " #) [# of hits] regular expression")
|
||||
return total
|
||||
|
||||
|
@ -474,7 +462,7 @@ class Fail2banRegex(object):
|
|||
|
||||
|
||||
if self._filter.dateDetector is not None:
|
||||
print "\nDate template hits:"
|
||||
output( "\nDate template hits:" )
|
||||
out = []
|
||||
for template in self._filter.dateDetector.templates:
|
||||
if self._verbose or template.hits:
|
||||
|
@ -482,10 +470,10 @@ class Fail2banRegex(object):
|
|||
template.hits, template.name))
|
||||
pprint_list(out, "[# of hits] date format")
|
||||
|
||||
print "\nLines: %s" % self._line_stats,
|
||||
output( "\nLines: %s" % self._line_stats, )
|
||||
if self._time_elapsed is not None:
|
||||
print "[processed in %.2f sec]" % self._time_elapsed,
|
||||
print
|
||||
output( "[processed in %.2f sec]" % self._time_elapsed, )
|
||||
output( "" )
|
||||
|
||||
if self._print_all_matched:
|
||||
self.printLines('matched')
|
||||
|
@ -496,9 +484,62 @@ class Fail2banRegex(object):
|
|||
|
||||
return True
|
||||
|
||||
def file_lines_gen(self, hdlr):
|
||||
for line in hdlr:
|
||||
yield FileContainer.decode_line('<LOG>', self.encoding, line)
|
||||
|
||||
if __name__ == "__main__":
|
||||
def start(self, opts, args):
|
||||
|
||||
cmd_log, cmd_regex = args[:2]
|
||||
|
||||
if not self.readRegex(cmd_regex, 'fail'):
|
||||
return False
|
||||
|
||||
if len(args) == 3 and not self.readRegex(args[2], 'ignore'):
|
||||
return False
|
||||
|
||||
if os.path.isfile(cmd_log):
|
||||
try:
|
||||
hdlr = open(cmd_log, 'rb')
|
||||
output( "Use log file : %s" % cmd_log )
|
||||
output( "Use encoding : %s" % self.encoding )
|
||||
test_lines = self.file_lines_gen(hdlr)
|
||||
except IOError, e:
|
||||
output( e )
|
||||
return False
|
||||
elif cmd_log == "systemd-journal":
|
||||
if not journal:
|
||||
output( "Error: systemd library not found. Exiting..." )
|
||||
return False
|
||||
myjournal = journal.Reader(converters={'__CURSOR': lambda x: x})
|
||||
journalmatch = self._journalmatch
|
||||
self.setDatePattern(None)
|
||||
if journalmatch:
|
||||
try:
|
||||
for element in journalmatch:
|
||||
if element == "+":
|
||||
myjournal.add_disjunction()
|
||||
else:
|
||||
myjournal.add_match(element)
|
||||
except ValueError:
|
||||
output( "Error: Invalid journalmatch: %s" % shortstr(" ".join(journalmatch)) )
|
||||
return False
|
||||
output( "Use journal match : %s" % " ".join(journalmatch) )
|
||||
test_lines = journal_lines_gen(myjournal)
|
||||
else:
|
||||
output( "Use single line : %s" % shortstr(cmd_log) )
|
||||
test_lines = [ cmd_log ]
|
||||
output( "" )
|
||||
|
||||
self.process(test_lines)
|
||||
|
||||
if not self.printStats():
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def exec_command_line(): # pragma: no cover
|
||||
parser = get_opt_parser()
|
||||
(opts, args) = parser.parse_args()
|
||||
if opts.print_no_missed and opts.print_all_missed:
|
||||
|
@ -510,18 +551,16 @@ if __name__ == "__main__":
|
|||
parser.print_help()
|
||||
sys.exit(-1)
|
||||
|
||||
print
|
||||
print "Running tests"
|
||||
print "============="
|
||||
print
|
||||
|
||||
fail2banRegex = Fail2banRegex(opts)
|
||||
|
||||
# We need 2 or 3 parameters
|
||||
if not len(args) in (2, 3):
|
||||
sys.stderr.write("ERROR: provide both <LOG> and <REGEX>.\n\n")
|
||||
parser.print_help()
|
||||
sys.exit(-1)
|
||||
return False
|
||||
|
||||
output( "" )
|
||||
output( "Running tests" )
|
||||
output( "=============" )
|
||||
output( "" )
|
||||
|
||||
# TODO: taken from -testcases -- move common functionality somewhere
|
||||
if opts.log_level is not None: # pragma: no cover
|
||||
|
@ -552,46 +591,6 @@ if __name__ == "__main__":
|
|||
stdout.setFormatter(Formatter(fmt))
|
||||
logSys.addHandler(stdout)
|
||||
|
||||
cmd_log, cmd_regex = args[:2]
|
||||
|
||||
fail2banRegex.readRegex(cmd_regex, 'fail') or sys.exit(-1)
|
||||
|
||||
if len(args) == 3:
|
||||
fail2banRegex.readRegex(args[2], 'ignore') or sys.exit(-1)
|
||||
|
||||
if os.path.isfile(cmd_log):
|
||||
try:
|
||||
hdlr = open(cmd_log, 'rb')
|
||||
print "Use log file : %s" % cmd_log
|
||||
print "Use encoding : %s" % fail2banRegex.encoding
|
||||
test_lines = file_lines_gen(hdlr)
|
||||
except IOError, e:
|
||||
print e
|
||||
sys.exit(-1)
|
||||
elif cmd_log == "systemd-journal":
|
||||
if not journal:
|
||||
print "Error: systemd library not found. Exiting..."
|
||||
sys.exit(-1)
|
||||
myjournal = journal.Reader(converters={'__CURSOR': lambda x: x})
|
||||
journalmatch = fail2banRegex._journalmatch
|
||||
fail2banRegex.setDatePattern(None)
|
||||
if journalmatch:
|
||||
try:
|
||||
for element in journalmatch:
|
||||
if element == "+":
|
||||
myjournal.add_disjunction()
|
||||
else:
|
||||
myjournal.add_match(element)
|
||||
except ValueError:
|
||||
print "Error: Invalid journalmatch: %s" % shortstr(" ".join(journalmatch))
|
||||
sys.exit(-1)
|
||||
print "Use journal match : %s" % " ".join(journalmatch)
|
||||
test_lines = journal_lines_gen(myjournal)
|
||||
else:
|
||||
print "Use single line : %s" % shortstr(cmd_log)
|
||||
test_lines = [ cmd_log ]
|
||||
print
|
||||
|
||||
fail2banRegex.process(test_lines)
|
||||
|
||||
fail2banRegex.printStats() or sys.exit(-1)
|
||||
fail2banRegex = Fail2banRegex(opts)
|
||||
if not fail2banRegex.start(opts, args):
|
||||
sys.exit(-1)
|
|
@ -792,23 +792,27 @@ class FileContainer:
|
|||
self.__handler.seek(self.__pos)
|
||||
return True
|
||||
|
||||
def readline(self):
|
||||
if self.__handler is None:
|
||||
return ""
|
||||
line = self.__handler.readline()
|
||||
@staticmethod
|
||||
def decode_line(filename, enc, line):
|
||||
try:
|
||||
line = line.decode(self.getEncoding(), 'strict')
|
||||
line = line.decode(enc, 'strict')
|
||||
except UnicodeDecodeError:
|
||||
logSys.warning(
|
||||
"Error decoding line from '%s' with '%s'."
|
||||
" Consider setting logencoding=utf-8 (or another appropriate"
|
||||
" encoding) for this jail. Continuing"
|
||||
" to process line ignoring invalid characters: %r" %
|
||||
(self.getFileName(), self.getEncoding(), line))
|
||||
(filename, enc, line))
|
||||
# decode with replacing error chars:
|
||||
line = line.decode(self.getEncoding(), 'replace')
|
||||
line = line.decode(enc, 'replace')
|
||||
return line
|
||||
|
||||
def readline(self):
|
||||
if self.__handler is None:
|
||||
return ""
|
||||
return FileContainer.decode_line(
|
||||
self.getFileName(), self.getEncoding(), self.__handler.readline())
|
||||
|
||||
def close(self):
|
||||
if not self.__handler is None:
|
||||
# Saves the last position.
|
||||
|
|
|
@ -0,0 +1,155 @@
|
|||
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
|
||||
# vi: set ft=python sts=4 ts=4 sw=4 noet :
|
||||
|
||||
# This file is part of Fail2Ban.
|
||||
#
|
||||
# Fail2Ban is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Fail2Ban is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Fail2Ban; if not, write to the Free Software
|
||||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
|
||||
# Fail2Ban developers
|
||||
|
||||
__author__ = "Serg Brester"
|
||||
__copyright__ = "Copyright (c) 2015 Serg G. Brester (sebres), 2008- Fail2Ban Contributors"
|
||||
__license__ = "GPL"
|
||||
|
||||
from __builtin__ import open as fopen
|
||||
import unittest
|
||||
import getpass
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import tempfile
|
||||
import uuid
|
||||
|
||||
try:
|
||||
from systemd import journal
|
||||
except ImportError:
|
||||
journal = None
|
||||
|
||||
from ..client import fail2banregex
|
||||
from ..client.fail2banregex import Fail2banRegex, get_opt_parser, output
|
||||
from .utils import LogCaptureTestCase, logSys
|
||||
|
||||
|
||||
fail2banregex.logSys = logSys
|
||||
def _test_output(*args):
|
||||
logSys.info(args[0])
|
||||
|
||||
fail2banregex.output = _test_output
|
||||
|
||||
CONF_FILES_DIR = os.path.abspath(
|
||||
os.path.join(os.path.dirname(__file__),"..", "..", "config"))
|
||||
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
|
||||
|
||||
|
||||
def _Fail2banRegex(*args):
|
||||
parser = get_opt_parser()
|
||||
(opts, args) = parser.parse_args(list(args))
|
||||
return (opts, args, Fail2banRegex(opts))
|
||||
|
||||
class Fail2banRegexTest(LogCaptureTestCase):
|
||||
|
||||
FILENAME_01 = os.path.join(TEST_FILES_DIR, "testcase01.log")
|
||||
FILENAME_02 = os.path.join(TEST_FILES_DIR, "testcase02.log")
|
||||
FILENAME_WRONGCHAR = os.path.join(TEST_FILES_DIR, "testcase-wrong-char.log")
|
||||
|
||||
FILTER_SSHD = os.path.join(CONF_FILES_DIR, 'filter.d', 'sshd.conf')
|
||||
|
||||
def setUp(self):
|
||||
"""Call before every test case."""
|
||||
LogCaptureTestCase.setUp(self)
|
||||
|
||||
def tearDown(self):
|
||||
"""Call after every test case."""
|
||||
LogCaptureTestCase.tearDown(self)
|
||||
|
||||
def testWrongRE(self):
|
||||
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||
"test", r".** from <HOST>$"
|
||||
)
|
||||
self.assertRaises(Exception, lambda: fail2banRegex.start(opts, args))
|
||||
self.assertLogged("Unable to compile regular expression")
|
||||
|
||||
def testWrongIngnoreRE(self):
|
||||
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||
"test", r".*? from <HOST>$", r".**"
|
||||
)
|
||||
self.assertRaises(Exception, lambda: fail2banRegex.start(opts, args))
|
||||
self.assertLogged("Unable to compile regular expression")
|
||||
|
||||
def testDirectFound(self):
|
||||
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||
"--print-all-matched", "--print-no-missed",
|
||||
"Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
|
||||
r"Authentication failure for .*? from <HOST>$"
|
||||
)
|
||||
self.assertTrue(fail2banRegex.start(opts, args))
|
||||
self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed')
|
||||
|
||||
def testDirectNotFound(self):
|
||||
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||
"--print-all-missed",
|
||||
"Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
|
||||
r"XYZ from <HOST>$"
|
||||
)
|
||||
self.assertTrue(fail2banRegex.start(opts, args))
|
||||
self.assertLogged('Lines: 1 lines, 0 ignored, 0 matched, 1 missed')
|
||||
|
||||
def testDirectIgnored(self):
|
||||
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||
"--print-all-ignored",
|
||||
"Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
|
||||
r"Authentication failure for .*? from <HOST>$",
|
||||
r"kevin from 192.0.2.0$"
|
||||
)
|
||||
self.assertTrue(fail2banRegex.start(opts, args))
|
||||
self.assertLogged('Lines: 1 lines, 1 ignored, 0 matched, 0 missed')
|
||||
|
||||
def testDirectRE_1(self):
|
||||
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||
"--print-all-matched",
|
||||
Fail2banRegexTest.FILENAME_01,
|
||||
r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>"
|
||||
)
|
||||
self.assertTrue(fail2banRegex.start(opts, args))
|
||||
self.assertLogged('Lines: 19 lines, 0 ignored, 13 matched, 6 missed')
|
||||
|
||||
self.assertLogged('Error decoding line');
|
||||
self.assertLogged('Continuing to process line ignoring invalid characters')
|
||||
|
||||
self.assertLogged('Dez 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128')
|
||||
self.assertLogged('Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 87.142.124.10')
|
||||
|
||||
def testDirectRE_2(self):
|
||||
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||
"--print-all-matched",
|
||||
Fail2banRegexTest.FILENAME_02,
|
||||
r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>"
|
||||
)
|
||||
self.assertTrue(fail2banRegex.start(opts, args))
|
||||
self.assertLogged('Lines: 13 lines, 0 ignored, 5 matched, 8 missed')
|
||||
|
||||
def testWronChar(self):
|
||||
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||
Fail2banRegexTest.FILENAME_WRONGCHAR, Fail2banRegexTest.FILTER_SSHD
|
||||
)
|
||||
self.assertTrue(fail2banRegex.start(opts, args))
|
||||
self.assertLogged('Lines: 4 lines, 0 ignored, 2 matched, 2 missed')
|
||||
|
||||
self.assertLogged('Error decoding line');
|
||||
self.assertLogged('Continuing to process line ignoring invalid characters:', '2015-01-14 20:00:58 user ');
|
||||
self.assertLogged('Continuing to process line ignoring invalid characters:', '2015-01-14 20:00:59 user ');
|
||||
|
||||
self.assertLogged('Nov 8 00:16:12 main sshd[32548]: input_userauth_request: invalid user llinco')
|
||||
self.assertLogged('Nov 8 00:16:12 main sshd[32547]: pam_succeed_if(sshd:auth): error retrieving information about user llinco')
|
|
@ -0,0 +1,4 @@
|
|||
Nov 8 00:16:12 main sshd[32547]: Invalid user llinco\361ir from 192.0.2.0
|
||||
Nov 8 00:16:12 main sshd[32548]: input_userauth_request: invalid user llinco\361ir
|
||||
Nov 8 00:16:12 main sshd[32547]: pam_succeed_if(sshd:auth): error retrieving information about user llincoñir
|
||||
Nov 8 00:16:14 main sshd[32547]: Failed password for invalid user llinco\361ir from 192.0.2.0 port 57025 ssh2
|
|
@ -85,6 +85,7 @@ def gatherTests(regexps=None, no_network=False):
|
|||
from . import misctestcase
|
||||
from . import databasetestcase
|
||||
from . import samplestestcase
|
||||
from . import fail2banregextestcase
|
||||
|
||||
if not regexps: # pragma: no cover
|
||||
tests = unittest.TestSuite()
|
||||
|
@ -152,6 +153,9 @@ def gatherTests(regexps=None, no_network=False):
|
|||
# Filter Regex tests with sample logs
|
||||
tests.addTest(unittest.makeSuite(samplestestcase.FilterSamplesRegex))
|
||||
|
||||
# bin/fail2ban-regex
|
||||
tests.addTest(unittest.makeSuite(fail2banregextestcase.Fail2banRegexTest))
|
||||
|
||||
#
|
||||
# Python action testcases
|
||||
#
|
||||
|
|
Loading…
Reference in New Issue