#!/usr/bin/python # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*- # vi: set ft=python sts=4 ts=4 sw=4 noet : # # This file is part of Fail2Ban. # # Fail2Ban is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # Fail2Ban is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Fail2Ban; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. """ Fail2Ban reads log file that contains password failure report and bans the corresponding IP addresses using firewall rules. This tools can test regular expressions for "fail2ban". Report bugs to https://github.com/fail2ban/fail2ban/issues """ __author__ = "Cyril Jaquier, Yaroslav Halchenko" __copyright__ = "Copyright (c) 2004-2008 Cyril Jaquier, 2012-2013 Yaroslav Halchenko" __license__ = "GPL" import getopt, sys, time, logging, os # Inserts our own modules path first in the list # fix for bug #343821 try: from common.version import version except ImportError, e: sys.path.insert(1, "/usr/share/fail2ban") from common.version import version from client.configparserinc import SafeConfigParserWithIncludes from ConfigParser import NoOptionError, NoSectionError, MissingSectionHeaderError from server.filter import Filter from server.failregex import RegexException from optparse import OptionParser, Option # Gets the instance of the logger. logSys = logging.getLogger("fail2ban.regex") def shortstr(s, l=53): """Return shortened string """ if len(s) > l: return s[:l-3] + '...' return s def pprint_list(l, header=None): if not len(l): return if header: s = "|- %s\n" % header else: s = '' print s + "| " + "\n| ".join(l) + '\n`-' def get_opt_parser(): # use module docstring for help output p = OptionParser( usage="%s [OPTIONS] [IGNOREREGEX]\n" % sys.argv[0] + __doc__ + """ LOG: string a string representing a log line filename path to a log file (/var/log/auth.log) REGEX: string a string representing a 'failregex' filename path to a filter file (filter.d/sshd.conf) IGNOREREGEX: string a string representing an 'ignoreregex' filename path to a filter file (filter.d/sshd.conf) """, version="%prog " + version) p.add_options([ Option('-l', "--log-level", type="choice", dest="log_level", choices=('debug', 'info', 'warn', 'error', 'fatal'), default=None, help="Log level for the Fail2Ban logger to use"), Option("-v", "--verbose", action='store_true', help="Be verbose in output"), Option("--print-all-missed", action='store_true', help="Either to print all missed lines"), Option("--print-all-ignored", action='store_true', help="Either to print all ignored lines"), ]) return p class RegexStat(object): def __init__(self, failregex): self._stats = 0 self._failregex = failregex self._ipList = list() def __str__(self): return "%s(%r) %d failed: %s" \ % (self.__class__, self._failregex, self._stats, self._ipList) def inc(self): self._stats += 1 def getStats(self): return self._stats def getFailRegex(self): return self._failregex def appendIP(self, value): self._ipList.extend(value) def getIPList(self): return self._ipList class LineStats(object): """Just a convenience container for stats """ def __init__(self): self.tested = self.matched = 0 self.missed_lines = [] self.ignored_lines = [] def __str__(self): return "%(tested)d lines, %(ignored)d ignored, %(matched)d matched, %(missed)d missed" % self @property def ignored(self): return len(self.ignored_lines) @property def missed(self): return self.tested - (self.ignored + self.matched) # just for convenient str def __getitem__(self, key): return getattr(self, key) class Fail2banRegex(object): # ??? test = None CONFIG_DEFAULTS = {'configpath' : "/etc/fail2ban/"} def __init__(self, opts): self._verbose = opts.verbose self._print_all_missed = opts.print_all_missed self._print_all_ignored = opts.print_all_ignored self._filter = Filter(None) self._ignoreregex = list() self._failregex = list() self._line_stats = LineStats() # Setup logging logging.getLogger("fail2ban").handlers = [] self._hdlr = logging.StreamHandler(Fail2banRegex.test) # set a format which is simpler for console use formatter = logging.Formatter("%(message)s") # tell the handler to use this format self._hdlr.setFormatter(formatter) self._logging_level = self._verbose and logging.DEBUG or logging.WARN logging.getLogger("fail2ban").addHandler(self._hdlr) logging.getLogger("fail2ban").setLevel(logging.ERROR) def readRegex(self, value, regextype): assert(regextype in ('fail', 'ignore')) regex = regextype + 'regex' if os.path.isfile(value): reader = SafeConfigParserWithIncludes(defaults=self.CONFIG_DEFAULTS) try: reader.read(value) print "Use %11s file : %s" % (regex, value) # TODO: reuse functionality in client regex_values = [RegexStat(m) for m in reader.get("Definition", regex).split('\n')] except NoSectionError: print "No [Definition] section in %s" % value return False except NoOptionError: print "No %s option in %s" % (regex, value) return False except MissingSectionHeaderError: print "No section headers in %s" % value return False else: print "Use %11s line : %s" % (regex, shortstr(value)) regex_values = [RegexStat(value)] setattr(self, "_" + regex, regex_values) return True def testIgnoreRegex(self, line): found = False for regex in self._ignoreregex: logging.getLogger("fail2ban").setLevel(self._logging_level) try: self._filter.addIgnoreRegex(regex.getFailRegex()) try: ret = self._filter.ignoreLine(line) if ret: found = True regex.inc() except RegexException, e: print e return False finally: self._filter.delIgnoreRegex(0) logging.getLogger("fail2ban").setLevel(self._logging_level) return found def testRegex(self, line): found = False for regex in self._ignoreregex: self._filter.addIgnoreRegex(regex.getFailRegex()) for regex in self._failregex: # logging.getLogger("fail2ban").setLevel(logging.DEBUG) try: self._filter.addFailRegex(regex.getFailRegex()) try: ret = self._filter.processLine(line) if len(ret): if found == True: ret[0].append(True) else: found = True ret[0].append(False) regex.inc() regex.appendIP(ret) except RegexException, e: print e return False except IndexError: print "Sorry, but no found in regex" return False finally: self._filter.delFailRegex(0) logging.getLogger("fail2ban").setLevel(logging.CRITICAL) for regex in self._ignoreregex: self._filter.delIgnoreRegex(0) return found def process(self, test_lines): for line in test_lines: if line.startswith('# ') or not line.strip(): # skip comment and empty lines continue is_ignored = fail2banRegex.testIgnoreRegex(line) if is_ignored: self._line_stats.ignored_lines.append(line) if fail2banRegex.testRegex(line): assert(not is_ignored) self._line_stats.matched += 1 else: if not is_ignored: self._line_stats.missed_lines.append(line) self._line_stats.tested += 1 def printLines(self, ltype): lstats = self._line_stats assert(len(lstats.missed_lines) == lstats.tested - (lstats.matched + lstats.ignored)) l = lstats[ltype + '_lines'] if len(l): header = "%s line(s):" % (ltype.capitalize(),) if len(l) < 20 or getattr(self, '_print_all_' + ltype): pprint_list([x.rstrip() for x in l], header) else: print "%s: too many to print. Use --print-all-%s " \ "to print all %d lines" % (header, ltype, len(l)) def printStats(self): print print "Results" print "=======" def print_failregexes(title, failregexes): # Print title total, out = 0, [] for cnt, failregex in enumerate(failregexes): match = failregex.getStats() total += match if (match or self._verbose): out.append("%2d) [%d] %s" % (cnt+1, match, failregex.getFailRegex())) if self._verbose and len(failregex.getIPList()): for ip in failregex.getIPList(): timeTuple = time.localtime(ip[1]) timeString = time.strftime("%a %b %d %H:%M:%S %Y", timeTuple) out.append(" %s %s%s" % ( ip[0], timeString, ip[2] and " (already matched)" or "")) print "\n%s: %d total" % (title, total) pprint_list(out, " #) [# of hits] regular expression") return total # Print title total = print_failregexes("Failregex", self._failregex) _ = print_failregexes("Ignoreregex", self._ignoreregex) print "\nDate template hits:" out = [] for template in self._filter.dateDetector.getTemplates(): if self._verbose or template.getHits(): out.append("[%d] %s" % (template.getHits(), template.getName())) pprint_list(out, "[# of hits] date format") print "\nLines: %s" % self._line_stats self.printLines('ignored') self.printLines('missed') return True if __name__ == "__main__": parser = get_opt_parser() (opts, args) = parser.parse_args() fail2banRegex = Fail2banRegex(opts) # We need 2 or 3 parameters if not len(args) in (2, 3): sys.stderr.write("ERROR: provide both and .\n\n") parser.print_help() sys.exit(-1) print print "Running tests" print "=============" print cmd_log, cmd_regex = args[:2] if len(args) == 3: fail2banRegex.readRegex(args[2], 'ignore') or sys.exit(-1) fail2banRegex.readRegex(cmd_regex, 'fail') or sys.exit(-1) if os.path.isfile(cmd_log): try: hdlr = open(cmd_log) print "Use log file : %s" % cmd_log test_lines = hdlr.readlines() except IOError, e: print e sys.exit(-1) else: print "Use single line : %s" % shortstr(cmd_log) test_lines = [ cmd_log ] print fail2banRegex.process(test_lines) fail2banRegex.printStats() or sys.exit(-1)