diff --git a/bin/fail2ban-regex b/fail2ban/client/fail2banregex.py similarity index 81% rename from bin/fail2ban-regex rename to fail2ban/client/fail2banregex.py index fdbfcb7a..12dde7f1 100755 --- a/bin/fail2ban-regex +++ b/fail2ban/client/fail2banregex.py @@ -44,16 +44,16 @@ from ConfigParser import NoOptionError, NoSectionError, MissingSectionHeaderErro try: from systemd import journal - from fail2ban.server.filtersystemd import FilterSystemd + from ..server.filtersystemd import FilterSystemd except ImportError: journal = None -from fail2ban.version import version -from fail2ban.client.filterreader import FilterReader -from fail2ban.server.filter import Filter -from fail2ban.server.failregex import RegexException +from ..version import version +from .filterreader import FilterReader +from ..server.filter import Filter, FileContainer +from ..server.failregex import RegexException -from fail2ban.helpers import FormatterWithTraceBack, getLogger +from ..helpers import FormatterWithTraceBack, getLogger # Gets the instance of the logger. logSys = getLogger("fail2ban") @@ -63,6 +63,9 @@ def debuggexURL(sample, regex): 'flavor': 'python' }) return 'http://www.debuggex.com/?' + q +def output(args): + print(args) + def shortstr(s, l=53): """Return shortened string """ @@ -77,22 +80,7 @@ def pprint_list(l, header=None): s = "|- %s\n" % header else: s = '' - print s + "| " + "\n| ".join(l) + '\n`-' - -def file_lines_gen(hdlr): - for line in hdlr: - try: - line = line.decode(fail2banRegex.encoding, 'strict') - except UnicodeDecodeError: - logSys.warning( - "Error decoding line from '%s' with '%s'." - " Consider setting logencoding=utf-8 (or another appropriate" - " encoding) for this jail. Continuing" - " to process line ignoring invalid characters: %r" % - ('', fail2banRegex.encoding, line)) - # decode with replacing error chars: - line = line.decode(fail2banRegex.encoding, 'replace') - yield line + output( s + "| " + "\n| ".join(l) + '\n`-' ) def journal_lines_gen(myjournal): while True: @@ -259,14 +247,14 @@ class Fail2banRegex(object): self._filter.setDatePattern(pattern) self._datepattern_set = True if pattern is not None: - print "Use datepattern : %s" % ( - self._filter.getDatePattern()[1], ) + output( "Use datepattern : %s" % ( + self._filter.getDatePattern()[1], ) ) def setMaxLines(self, v): if not self._maxlines_set: self._filter.setMaxLines(int(v)) self._maxlines_set = True - print "Use maxlines : %d" % self._filter.getMaxLines() + output( "Use maxlines : %d" % self._filter.getMaxLines() ) def setJournalMatch(self, v): if self._journalmatch is None: @@ -280,18 +268,18 @@ class Fail2banRegex(object): ## within filter.d folder - use standard loading algorithm to load filter completely (with .local etc.): basedir = os.path.dirname(os.path.dirname(value)) value = os.path.splitext(os.path.basename(value))[0] - print "Use %11s filter file : %s, basedir: %s" % (regex, value, basedir) + output( "Use %11s filter file : %s, basedir: %s" % (regex, value, basedir) ) reader = FilterReader(value, 'fail2ban-regex-jail', {}, share_config=self.share_config, basedir=basedir) if not reader.read(): - print "ERROR: failed to load filter %s" % value + output( "ERROR: failed to load filter %s" % value ) return False else: ## foreign file - readexplicit this file and includes if possible: - print "Use %11s file : %s" % (regex, value) + output( "Use %11s file : %s" % (regex, value) ) reader = FilterReader(value, 'fail2ban-regex-jail', {}, share_config=self.share_config) reader.setBaseDir(None) if not reader.readexplicit(): - print "ERROR: failed to read %s" % value + output( "ERROR: failed to read %s" % value ) return False reader.getOptions(None) readercommands = reader.convert() @@ -307,8 +295,8 @@ class Fail2banRegex(object): try: self.setMaxLines(maxlines) except ValueError: - print "ERROR: Invalid value for maxlines (%(maxlines)r) " \ - "read from %(value)s" % locals() + output( "ERROR: Invalid value for maxlines (%(maxlines)r) " \ + "read from %(value)s" % locals() ) return False elif command[2] == 'addjournalmatch': journalmatch = command[3:] @@ -317,7 +305,7 @@ class Fail2banRegex(object): datepattern = command[3] self.setDatePattern(datepattern) else: - print "Use %11s line : %s" % (regex, shortstr(value)) + output( "Use %11s line : %s" % (regex, shortstr(value)) ) regex_values = [RegexStat(value)] setattr(self, "_" + regex, regex_values) @@ -335,7 +323,7 @@ class Fail2banRegex(object): found = True regex = self._ignoreregex[ret].inc() except RegexException, e: - print e + output( e ) return False return found @@ -352,10 +340,10 @@ class Fail2banRegex(object): regex.inc() regex.appendIP(match) except RegexException, e: - print e + output( e ) return False except IndexError: - print "Sorry, but no found in regex" + output( "Sorry, but no found in regex" ) return False for bufLine in orgLineBuffer[int(fullBuffer):]: if bufLine not in self._filter._Filter__lineBuffer: @@ -376,7 +364,7 @@ class Fail2banRegex(object): t0 = time.time() for line_no, line in enumerate(test_lines): if isinstance(line, tuple): - line_datetimestripped, ret = fail2banRegex.testRegex( + line_datetimestripped, ret = self.testRegex( line[0], line[1]) line = "".join(line[0]) else: @@ -384,8 +372,8 @@ class Fail2banRegex(object): if line.startswith('#') or not line: # skip comment and empty lines continue - line_datetimestripped, ret = fail2banRegex.testRegex(line) - is_ignored = fail2banRegex.testIgnoreRegex(line_datetimestripped) + line_datetimestripped, ret = self.testRegex(line) + is_ignored = self.testIgnoreRegex(line_datetimestripped) if is_ignored: self._line_stats.ignored += 1 @@ -432,18 +420,18 @@ class Fail2banRegex(object): b = map(lambda a: a[0] + ' | ' + a[1].getFailRegex() + ' | ' + debuggexURL(a[0], a[1].getFailRegex()), ans) pprint_list([x.rstrip() for x in b], header) else: - print "%s too many to print. Use --print-all-%s " \ - "to print all %d lines" % (header, ltype, lines) + output( "%s too many to print. Use --print-all-%s " \ + "to print all %d lines" % (header, ltype, lines) ) elif lines < self._maxlines or getattr(self, '_print_all_' + ltype): pprint_list([x.rstrip() for x in l], header) else: - print "%s too many to print. Use --print-all-%s " \ - "to print all %d lines" % (header, ltype, lines) + output( "%s too many to print. Use --print-all-%s " \ + "to print all %d lines" % (header, ltype, lines) ) def printStats(self): - print - print "Results" - print "=======" + output( "" ) + output( "Results" ) + output( "=======" ) def print_failregexes(title, failregexes): # Print title @@ -464,7 +452,7 @@ class Fail2banRegex(object): timeString, ip[-1] and " (multiple regex matched)" or "")) - print "\n%s: %d total" % (title, total) + output( "\n%s: %d total" % (title, total) ) pprint_list(out, " #) [# of hits] regular expression") return total @@ -474,7 +462,7 @@ class Fail2banRegex(object): if self._filter.dateDetector is not None: - print "\nDate template hits:" + output( "\nDate template hits:" ) out = [] for template in self._filter.dateDetector.templates: if self._verbose or template.hits: @@ -482,10 +470,10 @@ class Fail2banRegex(object): template.hits, template.name)) pprint_list(out, "[# of hits] date format") - print "\nLines: %s" % self._line_stats, + output( "\nLines: %s" % self._line_stats, ) if self._time_elapsed is not None: - print "[processed in %.2f sec]" % self._time_elapsed, - print + output( "[processed in %.2f sec]" % self._time_elapsed, ) + output( "" ) if self._print_all_matched: self.printLines('matched') @@ -496,9 +484,62 @@ class Fail2banRegex(object): return True + def file_lines_gen(self, hdlr): + for line in hdlr: + yield FileContainer.decode_line('', self.encoding, line) -if __name__ == "__main__": + def start(self, opts, args): + cmd_log, cmd_regex = args[:2] + + if not self.readRegex(cmd_regex, 'fail'): + return False + + if len(args) == 3 and not self.readRegex(args[2], 'ignore'): + return False + + if os.path.isfile(cmd_log): + try: + hdlr = open(cmd_log, 'rb') + output( "Use log file : %s" % cmd_log ) + output( "Use encoding : %s" % self.encoding ) + test_lines = self.file_lines_gen(hdlr) + except IOError, e: + output( e ) + return False + elif cmd_log == "systemd-journal": + if not journal: + output( "Error: systemd library not found. Exiting..." ) + return False + myjournal = journal.Reader(converters={'__CURSOR': lambda x: x}) + journalmatch = self._journalmatch + self.setDatePattern(None) + if journalmatch: + try: + for element in journalmatch: + if element == "+": + myjournal.add_disjunction() + else: + myjournal.add_match(element) + except ValueError: + output( "Error: Invalid journalmatch: %s" % shortstr(" ".join(journalmatch)) ) + return False + output( "Use journal match : %s" % " ".join(journalmatch) ) + test_lines = journal_lines_gen(myjournal) + else: + output( "Use single line : %s" % shortstr(cmd_log) ) + test_lines = [ cmd_log ] + output( "" ) + + self.process(test_lines) + + if not self.printStats(): + return False + + return True + + +def exec_command_line(): # pragma: no cover parser = get_opt_parser() (opts, args) = parser.parse_args() if opts.print_no_missed and opts.print_all_missed: @@ -510,18 +551,16 @@ if __name__ == "__main__": parser.print_help() sys.exit(-1) - print - print "Running tests" - print "=============" - print - - fail2banRegex = Fail2banRegex(opts) - # We need 2 or 3 parameters if not len(args) in (2, 3): sys.stderr.write("ERROR: provide both and .\n\n") parser.print_help() - sys.exit(-1) + return False + + output( "" ) + output( "Running tests" ) + output( "=============" ) + output( "" ) # TODO: taken from -testcases -- move common functionality somewhere if opts.log_level is not None: # pragma: no cover @@ -552,46 +591,6 @@ if __name__ == "__main__": stdout.setFormatter(Formatter(fmt)) logSys.addHandler(stdout) - cmd_log, cmd_regex = args[:2] - - fail2banRegex.readRegex(cmd_regex, 'fail') or sys.exit(-1) - - if len(args) == 3: - fail2banRegex.readRegex(args[2], 'ignore') or sys.exit(-1) - - if os.path.isfile(cmd_log): - try: - hdlr = open(cmd_log, 'rb') - print "Use log file : %s" % cmd_log - print "Use encoding : %s" % fail2banRegex.encoding - test_lines = file_lines_gen(hdlr) - except IOError, e: - print e - sys.exit(-1) - elif cmd_log == "systemd-journal": - if not journal: - print "Error: systemd library not found. Exiting..." - sys.exit(-1) - myjournal = journal.Reader(converters={'__CURSOR': lambda x: x}) - journalmatch = fail2banRegex._journalmatch - fail2banRegex.setDatePattern(None) - if journalmatch: - try: - for element in journalmatch: - if element == "+": - myjournal.add_disjunction() - else: - myjournal.add_match(element) - except ValueError: - print "Error: Invalid journalmatch: %s" % shortstr(" ".join(journalmatch)) - sys.exit(-1) - print "Use journal match : %s" % " ".join(journalmatch) - test_lines = journal_lines_gen(myjournal) - else: - print "Use single line : %s" % shortstr(cmd_log) - test_lines = [ cmd_log ] - print - - fail2banRegex.process(test_lines) - - fail2banRegex.printStats() or sys.exit(-1) + fail2banRegex = Fail2banRegex(opts) + if not fail2banRegex.start(opts, args): + sys.exit(-1) diff --git a/fail2ban/server/filter.py b/fail2ban/server/filter.py index 6fc2cd6c..65be0467 100644 --- a/fail2ban/server/filter.py +++ b/fail2ban/server/filter.py @@ -792,23 +792,27 @@ class FileContainer: self.__handler.seek(self.__pos) return True - def readline(self): - if self.__handler is None: - return "" - line = self.__handler.readline() + @staticmethod + def decode_line(filename, enc, line): try: - line = line.decode(self.getEncoding(), 'strict') + line = line.decode(enc, 'strict') except UnicodeDecodeError: logSys.warning( "Error decoding line from '%s' with '%s'." " Consider setting logencoding=utf-8 (or another appropriate" " encoding) for this jail. Continuing" " to process line ignoring invalid characters: %r" % - (self.getFileName(), self.getEncoding(), line)) + (filename, enc, line)) # decode with replacing error chars: - line = line.decode(self.getEncoding(), 'replace') + line = line.decode(enc, 'replace') return line + def readline(self): + if self.__handler is None: + return "" + return FileContainer.decode_line( + self.getFileName(), self.getEncoding(), self.__handler.readline()) + def close(self): if not self.__handler is None: # Saves the last position. diff --git a/fail2ban/tests/fail2banregextestcase.py b/fail2ban/tests/fail2banregextestcase.py new file mode 100644 index 00000000..ee10128e --- /dev/null +++ b/fail2ban/tests/fail2banregextestcase.py @@ -0,0 +1,155 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*- +# vi: set ft=python sts=4 ts=4 sw=4 noet : + +# This file is part of Fail2Ban. +# +# Fail2Ban is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# Fail2Ban is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Fail2Ban; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +# Fail2Ban developers + +__author__ = "Serg Brester" +__copyright__ = "Copyright (c) 2015 Serg G. Brester (sebres), 2008- Fail2Ban Contributors" +__license__ = "GPL" + +from __builtin__ import open as fopen +import unittest +import getpass +import os +import sys +import time +import tempfile +import uuid + +try: + from systemd import journal +except ImportError: + journal = None + +from ..client import fail2banregex +from ..client.fail2banregex import Fail2banRegex, get_opt_parser, output +from .utils import LogCaptureTestCase, logSys + + +fail2banregex.logSys = logSys +def _test_output(*args): + logSys.info(args[0]) + +fail2banregex.output = _test_output + +CONF_FILES_DIR = os.path.abspath( + os.path.join(os.path.dirname(__file__),"..", "..", "config")) +TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files") + + +def _Fail2banRegex(*args): + parser = get_opt_parser() + (opts, args) = parser.parse_args(list(args)) + return (opts, args, Fail2banRegex(opts)) + +class Fail2banRegexTest(LogCaptureTestCase): + + FILENAME_01 = os.path.join(TEST_FILES_DIR, "testcase01.log") + FILENAME_02 = os.path.join(TEST_FILES_DIR, "testcase02.log") + FILENAME_WRONGCHAR = os.path.join(TEST_FILES_DIR, "testcase-wrong-char.log") + + FILTER_SSHD = os.path.join(CONF_FILES_DIR, 'filter.d', 'sshd.conf') + + def setUp(self): + """Call before every test case.""" + LogCaptureTestCase.setUp(self) + + def tearDown(self): + """Call after every test case.""" + LogCaptureTestCase.tearDown(self) + + def testWrongRE(self): + (opts, args, fail2banRegex) = _Fail2banRegex( + "test", r".** from $" + ) + self.assertRaises(Exception, lambda: fail2banRegex.start(opts, args)) + self.assertLogged("Unable to compile regular expression") + + def testWrongIngnoreRE(self): + (opts, args, fail2banRegex) = _Fail2banRegex( + "test", r".*? from $", r".**" + ) + self.assertRaises(Exception, lambda: fail2banRegex.start(opts, args)) + self.assertLogged("Unable to compile regular expression") + + def testDirectFound(self): + (opts, args, fail2banRegex) = _Fail2banRegex( + "--print-all-matched", "--print-no-missed", + "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0", + r"Authentication failure for .*? from $" + ) + self.assertTrue(fail2banRegex.start(opts, args)) + self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed') + + def testDirectNotFound(self): + (opts, args, fail2banRegex) = _Fail2banRegex( + "--print-all-missed", + "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0", + r"XYZ from $" + ) + self.assertTrue(fail2banRegex.start(opts, args)) + self.assertLogged('Lines: 1 lines, 0 ignored, 0 matched, 1 missed') + + def testDirectIgnored(self): + (opts, args, fail2banRegex) = _Fail2banRegex( + "--print-all-ignored", + "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0", + r"Authentication failure for .*? from $", + r"kevin from 192.0.2.0$" + ) + self.assertTrue(fail2banRegex.start(opts, args)) + self.assertLogged('Lines: 1 lines, 1 ignored, 0 matched, 0 missed') + + def testDirectRE_1(self): + (opts, args, fail2banRegex) = _Fail2banRegex( + "--print-all-matched", + Fail2banRegexTest.FILENAME_01, + r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) " + ) + self.assertTrue(fail2banRegex.start(opts, args)) + self.assertLogged('Lines: 19 lines, 0 ignored, 13 matched, 6 missed') + + self.assertLogged('Error decoding line'); + self.assertLogged('Continuing to process line ignoring invalid characters') + + self.assertLogged('Dez 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128') + self.assertLogged('Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 87.142.124.10') + + def testDirectRE_2(self): + (opts, args, fail2banRegex) = _Fail2banRegex( + "--print-all-matched", + Fail2banRegexTest.FILENAME_02, + r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) " + ) + self.assertTrue(fail2banRegex.start(opts, args)) + self.assertLogged('Lines: 13 lines, 0 ignored, 5 matched, 8 missed') + + def testWronChar(self): + (opts, args, fail2banRegex) = _Fail2banRegex( + Fail2banRegexTest.FILENAME_WRONGCHAR, Fail2banRegexTest.FILTER_SSHD + ) + self.assertTrue(fail2banRegex.start(opts, args)) + self.assertLogged('Lines: 4 lines, 0 ignored, 2 matched, 2 missed') + + self.assertLogged('Error decoding line'); + self.assertLogged('Continuing to process line ignoring invalid characters:', '2015-01-14 20:00:58 user '); + self.assertLogged('Continuing to process line ignoring invalid characters:', '2015-01-14 20:00:59 user '); + + self.assertLogged('Nov 8 00:16:12 main sshd[32548]: input_userauth_request: invalid user llinco') + self.assertLogged('Nov 8 00:16:12 main sshd[32547]: pam_succeed_if(sshd:auth): error retrieving information about user llinco') diff --git a/fail2ban/tests/files/testcase-wrong-char.log b/fail2ban/tests/files/testcase-wrong-char.log new file mode 100644 index 00000000..9736020e --- /dev/null +++ b/fail2ban/tests/files/testcase-wrong-char.log @@ -0,0 +1,4 @@ +Nov 8 00:16:12 main sshd[32547]: Invalid user llinco\361ir from 192.0.2.0 +Nov 8 00:16:12 main sshd[32548]: input_userauth_request: invalid user llinco\361ir +Nov 8 00:16:12 main sshd[32547]: pam_succeed_if(sshd:auth): error retrieving information about user llincoņir +Nov 8 00:16:14 main sshd[32547]: Failed password for invalid user llinco\361ir from 192.0.2.0 port 57025 ssh2 diff --git a/fail2ban/tests/utils.py b/fail2ban/tests/utils.py index 35fa59fd..8172e7ec 100644 --- a/fail2ban/tests/utils.py +++ b/fail2ban/tests/utils.py @@ -85,6 +85,7 @@ def gatherTests(regexps=None, no_network=False): from . import misctestcase from . import databasetestcase from . import samplestestcase + from . import fail2banregextestcase if not regexps: # pragma: no cover tests = unittest.TestSuite() @@ -152,6 +153,9 @@ def gatherTests(regexps=None, no_network=False): # Filter Regex tests with sample logs tests.addTest(unittest.makeSuite(samplestestcase.FilterSamplesRegex)) + # bin/fail2ban-regex + tests.addTest(unittest.makeSuite(fail2banregextestcase.Fail2banRegexTest)) + # # Python action testcases #