mirror of https://github.com/fail2ban/fail2ban
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
432 lines
12 KiB
432 lines
12 KiB
#!/usr/bin/python |
|
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*- |
|
# vi: set ft=python sts=4 ts=4 sw=4 noet : |
|
# |
|
# This file is part of Fail2Ban. |
|
# |
|
# Fail2Ban is free software; you can redistribute it and/or modify |
|
# it under the terms of the GNU General Public License as published by |
|
# the Free Software Foundation; either version 2 of the License, or |
|
# (at your option) any later version. |
|
# |
|
# Fail2Ban is distributed in the hope that it will be useful, |
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
# GNU General Public License for more details. |
|
# |
|
# You should have received a copy of the GNU General Public License |
|
# along with Fail2Ban; if not, write to the Free Software |
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
|
""" |
|
Fail2Ban reads log file that contains password failure report |
|
and bans the corresponding IP addresses using firewall rules. |
|
|
|
This tools can test regular expressions for "fail2ban". |
|
|
|
""" |
|
|
|
__author__ = "Cyril Jaquier, Yaroslav Halchenko" |
|
__copyright__ = "Copyright (c) 2004-2008 Cyril Jaquier, 2012-2013 Yaroslav Halchenko" |
|
__license__ = "GPL" |
|
|
|
import getopt, sys, time, logging, os, urllib |
|
|
|
# Inserts our own modules path first in the list |
|
# fix for bug #343821 |
|
try: |
|
from common.version import version |
|
except ImportError, e: |
|
sys.path.insert(1, "/usr/share/fail2ban") |
|
from common.version import version |
|
|
|
from optparse import OptionParser, Option |
|
|
|
from client.configparserinc import SafeConfigParserWithIncludes |
|
from ConfigParser import NoOptionError, NoSectionError, MissingSectionHeaderError |
|
from server.filter import Filter |
|
from server.failregex import RegexException |
|
|
|
from testcases.utils import FormatterWithTraceBack |
|
# Gets the instance of the logger. |
|
logSys = logging.getLogger("fail2ban") |
|
|
|
def debuggexURL(sample, regex): |
|
q = urllib.urlencode({ 're': regex.replace('<HOST>', '(?&.ipv4)'), |
|
'str': sample, |
|
'flavor': 'python' }) |
|
return 'http://www.debuggex.com/?' + q |
|
|
|
def shortstr(s, l=53): |
|
"""Return shortened string |
|
""" |
|
if len(s) > l: |
|
return s[:l-3] + '...' |
|
return s |
|
|
|
def pprint_list(l, header=None): |
|
if not len(l): |
|
return |
|
if header: |
|
s = "|- %s\n" % header |
|
else: |
|
s = '' |
|
print s + "| " + "\n| ".join(l) + '\n`-' |
|
|
|
|
|
def get_opt_parser(): |
|
# use module docstring for help output |
|
p = OptionParser( |
|
usage="%s [OPTIONS] <LOG> <REGEX> [IGNOREREGEX]\n" % sys.argv[0] + __doc__ |
|
+ """ |
|
LOG: |
|
string a string representing a log line |
|
filename path to a log file (/var/log/auth.log) |
|
|
|
REGEX: |
|
string a string representing a 'failregex' |
|
filename path to a filter file (filter.d/sshd.conf) |
|
|
|
IGNOREREGEX: |
|
string a string representing an 'ignoreregex' |
|
filename path to a filter file (filter.d/sshd.conf) |
|
|
|
Copyright (c) 2004-2008 Cyril Jaquier, 2008- Fail2Ban Contributors |
|
Copyright of modifications held by their respective authors. |
|
Licensed under the GNU General Public License v2 (GPL). |
|
|
|
Written by Cyril Jaquier <cyril.jaquier@fail2ban.org>. |
|
Many contributions by Yaroslav O. Halchenko and Steven Hiscocks. |
|
|
|
Report bugs to https://github.com/fail2ban/fail2ban/issues |
|
""", |
|
version="%prog " + version) |
|
|
|
p.add_options([ |
|
Option('-l', "--log-level", type="choice", |
|
dest="log_level", |
|
choices=('heavydebug', 'debug', 'info', 'warning', 'error', 'fatal'), |
|
default=None, |
|
help="Log level for the Fail2Ban logger to use"), |
|
Option("-v", "--verbose", action='store_true', |
|
help="Be verbose in output"), |
|
Option("-D", "--debuggex", action='store_true', |
|
help="Produce debuggex.com urls for debugging there"), |
|
Option("--print-all-missed", action='store_true', |
|
help="Either to print all missed lines"), |
|
Option("--print-all-ignored", action='store_true', |
|
help="Either to print all ignored lines"), |
|
Option("-t", "--log-traceback", action='store_true', |
|
help="Enrich log-messages with compressed tracebacks"), |
|
Option("--full-traceback", action='store_true', |
|
help="Either to make the tracebacks full, not compressed (as by default)"), |
|
]) |
|
|
|
return p |
|
|
|
|
|
class RegexStat(object): |
|
|
|
def __init__(self, failregex): |
|
self._stats = 0 |
|
self._failregex = failregex |
|
self._ipList = list() |
|
|
|
def __str__(self): |
|
return "%s(%r) %d failed: %s" \ |
|
% (self.__class__, self._failregex, self._stats, self._ipList) |
|
|
|
def inc(self): |
|
self._stats += 1 |
|
|
|
def getStats(self): |
|
return self._stats |
|
|
|
def getFailRegex(self): |
|
return self._failregex |
|
|
|
def appendIP(self, value): |
|
self._ipList.append(value) |
|
|
|
def getIPList(self): |
|
return self._ipList |
|
|
|
|
|
class LineStats(object): |
|
"""Just a convenience container for stats |
|
""" |
|
def __init__(self): |
|
self.tested = self.matched = 0 |
|
self.missed_lines = [] |
|
self.missed_lines_timeextracted = [] |
|
self.ignored_lines = [] |
|
self.ignored_lines_timeextracted = [] |
|
|
|
def __str__(self): |
|
return "%(tested)d lines, %(ignored)d ignored, %(matched)d matched, %(missed)d missed" % self |
|
|
|
@property |
|
def ignored(self): |
|
return len(self.ignored_lines) |
|
|
|
@property |
|
def missed(self): |
|
return self.tested - (self.ignored + self.matched) |
|
|
|
# just for convenient str |
|
def __getitem__(self, key): |
|
return getattr(self, key) |
|
|
|
|
|
class Fail2banRegex(object): |
|
|
|
CONFIG_DEFAULTS = {'configpath' : "/etc/fail2ban/"} |
|
|
|
def __init__(self, opts): |
|
self._verbose = opts.verbose |
|
self._debuggex = opts.debuggex |
|
self._print_all_missed = opts.print_all_missed |
|
self._print_all_ignored = opts.print_all_ignored |
|
|
|
self._filter = Filter(None) |
|
self._ignoreregex = list() |
|
self._failregex = list() |
|
self._line_stats = LineStats() |
|
|
|
|
|
def readRegex(self, value, regextype): |
|
assert(regextype in ('fail', 'ignore')) |
|
regex = regextype + 'regex' |
|
if os.path.isfile(value): |
|
reader = SafeConfigParserWithIncludes(defaults=self.CONFIG_DEFAULTS) |
|
try: |
|
reader.read(value) |
|
print "Use %11s file : %s" % (regex, value) |
|
# TODO: reuse functionality in client |
|
regex_values = [ |
|
RegexStat(m) |
|
for m in reader.get("Definition", regex).split('\n') |
|
if m != ""] |
|
except NoSectionError: |
|
print "No [Definition] section in %s" % value |
|
return False |
|
except NoOptionError: |
|
print "No %s option in %s" % (regex, value) |
|
return False |
|
except MissingSectionHeaderError: |
|
print "No section headers in %s" % value |
|
return False |
|
else: |
|
print "Use %11s line : %s" % (regex, shortstr(value)) |
|
regex_values = [RegexStat(value)] |
|
|
|
setattr(self, "_" + regex, regex_values) |
|
for regex in regex_values: |
|
getattr( |
|
self._filter, |
|
'add%sRegex' % regextype.title())(regex.getFailRegex()) |
|
return True |
|
|
|
def testIgnoreRegex(self, line): |
|
found = False |
|
try: |
|
ret = self._filter.ignoreLine(line) |
|
if ret is not None: |
|
found = True |
|
regex = self._ignoreregex[ret].inc() |
|
except RegexException, e: |
|
print e |
|
return False |
|
return found |
|
|
|
def testRegex(self, line): |
|
try: |
|
line, ret = self._filter.processLine(line, checkAllRegex=True) |
|
for match in ret: |
|
# Append True/False flag depending if line was matched by |
|
# more than one regex |
|
match.append(len(ret)>1) |
|
regex = self._failregex[match[0]] |
|
regex.inc() |
|
regex.appendIP(match) |
|
except RegexException, e: |
|
print e |
|
return False |
|
except IndexError: |
|
print "Sorry, but no <HOST> found in regex" |
|
return False |
|
return line, ret |
|
|
|
|
|
def process(self, test_lines): |
|
|
|
for line_no, line in enumerate(test_lines): |
|
if line.startswith('#') or not line.strip(): |
|
# skip comment and empty lines |
|
continue |
|
is_ignored = fail2banRegex.testIgnoreRegex(line) |
|
line_datetimestripped, ret = fail2banRegex.testRegex(line) |
|
|
|
if is_ignored: |
|
self._line_stats.ignored_lines.append(line) |
|
self._line_stats.ignored_lines_timeextracted.append(line_datetimestripped) |
|
|
|
if len(ret) > 0: |
|
assert(not is_ignored) |
|
self._line_stats.matched += 1 |
|
else: |
|
if not is_ignored: |
|
self._line_stats.missed_lines.append(line) |
|
self._line_stats.missed_lines_timeextracted.append(line_datetimestripped) |
|
self._line_stats.tested += 1 |
|
|
|
if line_no % 10 == 0: |
|
self._filter.dateDetector.sortTemplate() |
|
|
|
|
|
|
|
def printLines(self, ltype): |
|
lstats = self._line_stats |
|
assert(len(lstats.missed_lines) == lstats.tested - (lstats.matched + lstats.ignored)) |
|
l = lstats[ltype + '_lines'] |
|
if len(l): |
|
header = "%s line(s):" % (ltype.capitalize(),) |
|
if self._debuggex: |
|
if ltype == 'missed': |
|
regexlist = self._failregex |
|
else: |
|
regexlist = self._ignoreregex |
|
l = lstats[ltype + '_lines_timeextracted'] |
|
lines = len(l)*len(regexlist) |
|
if lines < 20 or getattr(self, '_print_all_' + ltype): |
|
ans = [[]] |
|
for arg in [l, regexlist]: |
|
ans = [ x + [y] for x in ans for y in arg ] |
|
b = map(lambda a: a[0] + ' | ' + a[1].getFailRegex() + ' | ' + debuggexURL(a[0], a[1].getFailRegex()), ans) |
|
pprint_list([x.rstrip() for x in b], header) |
|
else: |
|
print "%s: too many to print. Use --print-all-%s " \ |
|
"to print all %d lines" % (header, ltype, lines) |
|
elif len(l) < 20 or getattr(self, '_print_all_' + ltype): |
|
pprint_list([x.rstrip() for x in l], header) |
|
else: |
|
print "%s: too many to print. Use --print-all-%s " \ |
|
"to print all %d lines" % (header, ltype, len(l)) |
|
|
|
def printStats(self): |
|
print |
|
print "Results" |
|
print "=======" |
|
|
|
def print_failregexes(title, failregexes): |
|
# Print title |
|
total, out = 0, [] |
|
for cnt, failregex in enumerate(failregexes): |
|
match = failregex.getStats() |
|
total += match |
|
if (match or self._verbose): |
|
out.append("%2d) [%d] %s" % (cnt+1, match, failregex.getFailRegex())) |
|
|
|
if self._verbose and len(failregex.getIPList()): |
|
for ip in failregex.getIPList(): |
|
timeTuple = time.localtime(ip[2]) |
|
timeString = time.strftime("%a %b %d %H:%M:%S %Y", timeTuple) |
|
out.append( |
|
" %s %s%s" % ( |
|
ip[1], |
|
timeString, |
|
ip[3] and " (multiple regex matched)" or "")) |
|
|
|
print "\n%s: %d total" % (title, total) |
|
pprint_list(out, " #) [# of hits] regular expression") |
|
return total |
|
|
|
# Print title |
|
total = print_failregexes("Failregex", self._failregex) |
|
_ = print_failregexes("Ignoreregex", self._ignoreregex) |
|
|
|
|
|
print "\nDate template hits:" |
|
out = [] |
|
for template in self._filter.dateDetector.getTemplates(): |
|
if self._verbose or template.getHits(): |
|
out.append("[%d] %s" % (template.getHits(), template.getName())) |
|
pprint_list(out, "[# of hits] date format") |
|
|
|
print "\nLines: %s" % self._line_stats |
|
|
|
self.printLines('ignored') |
|
self.printLines('missed') |
|
|
|
return True |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
parser = get_opt_parser() |
|
(opts, args) = parser.parse_args() |
|
|
|
fail2banRegex = Fail2banRegex(opts) |
|
|
|
# We need 2 or 3 parameters |
|
if not len(args) in (2, 3): |
|
sys.stderr.write("ERROR: provide both <LOG> and <REGEX>.\n\n") |
|
parser.print_help() |
|
sys.exit(-1) |
|
|
|
# TODO: taken from -testcases -- move common functionality somewhere |
|
if opts.log_level is not None: # pragma: no cover |
|
# so we had explicit settings |
|
logSys.setLevel(getattr(logging, opts.log_level.upper())) |
|
else: # pragma: no cover |
|
# suppress the logging but it would leave unittests' progress dots |
|
# ticking, unless like with '-l fatal' which would be silent |
|
# unless error occurs |
|
logSys.setLevel(getattr(logging, 'FATAL')) |
|
|
|
# Add the default logging handler |
|
stdout = logging.StreamHandler(sys.stdout) |
|
|
|
fmt = 'D: %(message)s' |
|
|
|
if opts.log_traceback: |
|
Formatter = FormatterWithTraceBack |
|
fmt = (opts.full_traceback and ' %(tb)s' or ' %(tbc)s') + fmt |
|
else: |
|
Formatter = logging.Formatter |
|
|
|
# Custom log format for the verbose tests runs |
|
if opts.verbose > 1: # pragma: no cover |
|
stdout.setFormatter(Formatter(' %(asctime)-15s %(thread)s' + fmt)) |
|
else: # pragma: no cover |
|
# just prefix with the space |
|
stdout.setFormatter(Formatter(fmt)) |
|
logSys.addHandler(stdout) |
|
|
|
print |
|
print "Running tests" |
|
print "=============" |
|
print |
|
|
|
cmd_log, cmd_regex = args[:2] |
|
|
|
if len(args) == 3: |
|
fail2banRegex.readRegex(args[2], 'ignore') or sys.exit(-1) |
|
|
|
fail2banRegex.readRegex(cmd_regex, 'fail') or sys.exit(-1) |
|
|
|
if os.path.isfile(cmd_log): |
|
try: |
|
hdlr = open(cmd_log) |
|
print "Use log file : %s" % cmd_log |
|
test_lines = hdlr # Iterable |
|
except IOError, e: |
|
print e |
|
sys.exit(-1) |
|
else: |
|
print "Use single line : %s" % shortstr(cmd_log) |
|
test_lines = [ cmd_log ] |
|
print |
|
|
|
fail2banRegex.process(test_lines) |
|
|
|
fail2banRegex.printStats() or sys.exit(-1)
|
|
|