diff --git a/fail2ban-regex b/fail2ban-regex index a0a90b05..af38d4d9 100755 --- a/fail2ban-regex +++ b/fail2ban-regex @@ -17,9 +17,17 @@ # You should have received a copy of the GNU General Public License # along with Fail2Ban; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +""" +Fail2Ban reads log file that contains password failure report +and bans the corresponding IP addresses using firewall rules. + +This tools can test regular expressions for "fail2ban". + +Report bugs to https://github.com/fail2ban/fail2ban/issues +""" __author__ = "Cyril Jaquier, Yaroslav Halchenko" -__copyright__ = "Copyright (c) 2004 Cyril Jaquier, 2012 Yaroslav Halchenko" +__copyright__ = "Copyright (c) 2004-2008 Cyril Jaquier, 2012-2013 Yaroslav Halchenko" __license__ = "GPL" import getopt, sys, time, logging, os @@ -37,202 +45,201 @@ from ConfigParser import NoOptionError, NoSectionError, MissingSectionHeaderErro from server.filter import Filter from server.failregex import RegexException +from optparse import OptionParser, Option + # Gets the instance of the logger. logSys = logging.getLogger("fail2ban.regex") -class RegexStat: +def shortstr(s, l=53): + """Return shortened string + """ + if len(s) > l: + return s[:l-3] + '...' + return s + +def pprint_list(l, header=None): + if not len(l): + return + if header: + s = "|- %s\n" % header + else: + s = '' + print s + "| " + "\n| ".join(l) + '\n`-' + +def get_opt_parser(): + # use module docstring for help output + p = OptionParser( + usage="%s [OPTIONS] [IGNOREREGEX]\n" % sys.argv[0] + __doc__ + + """ +LOG: + string a string representing a log line + filename path to a log file (/var/log/auth.log) + +REGEX: + string a string representing a 'failregex' + filename path to a filter file (filter.d/sshd.conf) + +IGNOREREGEX: + string a string representing an 'ignoreregex' + filename path to a filter file (filter.d/sshd.conf) +""", + version="%prog " + version) + + p.add_options([ + Option('-l', "--log-level", type="choice", + dest="log_level", + choices=('debug', 'info', 'warn', 'error', 'fatal'), + default=None, + help="Log level for the Fail2Ban logger to use"), + Option("-v", "--verbose", action='store_true', + help="Be verbose in output"), + Option("--print-all-missed", action='store_true', + help="Either to print all missed lines"), + Option("--print-all-ignored", action='store_true', + help="Either to print all ignored lines"), + + ]) + + return p + + +class RegexStat(object): def __init__(self, failregex): - self.__stats = 0 - self.__failregex = failregex - self.__ipList = list() + self._stats = 0 + self._failregex = failregex + self._ipList = list() def __str__(self): return "%s(%r) %d failed: %s" \ - % (self.__class__, self.__failregex, self.__stats, self.__ipList) + % (self.__class__, self._failregex, self._stats, self._ipList) def inc(self): - self.__stats += 1 + self._stats += 1 def getStats(self): - return self.__stats + return self._stats def getFailRegex(self): - return self.__failregex + return self._failregex def appendIP(self, value): - self.__ipList.extend(value) + self._ipList.extend(value) def getIPList(self): - return self.__ipList + return self._ipList -class Fail2banRegex: +class LineStats(object): + """Just a convenience container for stats + """ + def __init__(self): + self.tested = self.matched = 0 + self.missed_lines = [] + self.ignored_lines = [] + + def __str__(self): + return "%(tested)d lines, %(ignored)d ignored, %(matched)d matched, %(missed)d missed" % self + + @property + def ignored(self): + return len(self.ignored_lines) + + @property + def missed(self): + return self.tested - (self.ignored + self.matched) + + # just for convenient str + def __getitem__(self, key): + return getattr(self, key) + + +class Fail2banRegex(object): + + # ??? test = None CONFIG_DEFAULTS = {'configpath' : "/etc/fail2ban/"} - def __init__(self): - self.__filter = Filter(None) - self.__ignoreregex = list() - self.__failregex = list() - self.__verbose = False + def __init__(self, opts): + self._verbose = opts.verbose + self._print_all_missed = opts.print_all_missed + self._print_all_ignored = opts.print_all_ignored + + self._filter = Filter(None) + self._ignoreregex = list() + self._failregex = list() + self._line_stats = LineStats() + # Setup logging logging.getLogger("fail2ban").handlers = [] - self.__hdlr = logging.StreamHandler(Fail2banRegex.test) + self._hdlr = logging.StreamHandler(Fail2banRegex.test) # set a format which is simpler for console use formatter = logging.Formatter("%(message)s") # tell the handler to use this format - self.__hdlr.setFormatter(formatter) - self.__logging_level = self.__verbose and logging.DEBUG or logging.WARN - logging.getLogger("fail2ban").addHandler(self.__hdlr) + self._hdlr.setFormatter(formatter) + self._logging_level = self._verbose and logging.DEBUG or logging.WARN + logging.getLogger("fail2ban").addHandler(self._hdlr) logging.getLogger("fail2ban").setLevel(logging.ERROR) - #@staticmethod - def dispVersion(): - print "Fail2Ban v" + version - print - print "Copyright (c) 2004-2008 Cyril Jaquier" - print "Copyright of modifications held by their respective authors." - print "Licensed under the GNU General Public License v2 (GPL)." - print - print "Written by Cyril Jaquier ." - print "Many contributions by Yaroslav O. Halchenko ." - dispVersion = staticmethod(dispVersion) - #@staticmethod - def dispUsage(): - print "Usage: "+sys.argv[0]+" [OPTIONS] [IGNOREREGEX]" - print - print "Fail2Ban v" + version + " reads log file that contains password failure report" - print "and bans the corresponding IP addresses using firewall rules." - print - print "This tools can test regular expressions for \"fail2ban\"." - print - print "Options:" - print " -h, --help display this help message" - print " -V, --version print the version" - print " -v, --verbose verbose output" - print - print "Log:" - print " string a string representing a log line" - print " filename path to a log file (/var/log/auth.log)" - print - print "Regex:" - print " string a string representing a 'failregex'" - print " filename path to a filter file (filter.d/sshd.conf)" - print - print "IgnoreRegex:" - print " string a string representing an 'ignoreregex'" - print " filename path to a filter file (filter.d/sshd.conf)" - print - print "Report bugs to https://github.com/fail2ban/fail2ban/issues" - dispUsage = staticmethod(dispUsage) - - def getCmdLineOptions(self, optList): - """ Gets the command line options - """ - for opt in optList: - if opt[0] in ["-h", "--help"]: - self.dispUsage() - sys.exit(0) - elif opt[0] in ["-V", "--version"]: - self.dispVersion() - sys.exit(0) - elif opt[0] in ["-v", "--verbose"]: - self.__verbose = True - - #@staticmethod - def logIsFile(value): - return os.path.isfile(value) - logIsFile = staticmethod(logIsFile) - - def readIgnoreRegex(self, value): + def readRegex(self, value, regextype): + assert(regextype in ('fail', 'ignore')) + regex = regextype + 'regex' if os.path.isfile(value): reader = SafeConfigParserWithIncludes(defaults=self.CONFIG_DEFAULTS) try: reader.read(value) - print "Use ignoreregex file : " + value - self.__ignoreregex = [RegexStat(m) - for m in reader.get("Definition", "ignoreregex").split('\n')] + print "Use %11s file : %s" % (regex, value) + # TODO: reuse functionality in client + regex_values = [RegexStat(m) + for m in reader.get("Definition", regex).split('\n')] except NoSectionError: - print "No [Definition] section in " + value - print + print "No [Definition] section in %s" % value return False except NoOptionError: - print "No failregex option in " + value - print + print "No %s option in %s" % (regex, value) return False except MissingSectionHeaderError: - print "No section headers in " + value - print + print "No section headers in %s" % value return False else: - if len(value) > 53: - stripReg = value[0:50] + "..." - else: - stripReg = value - print "Use ignoreregex line : " + stripReg - self.__ignoreregex = [RegexStat(value)] - return True + print "Use %11s line : %s" % (regex, shortstr(value)) + regex_values = [RegexStat(value)] - def readRegex(self, value): - if os.path.isfile(value): - reader = SafeConfigParserWithIncludes(defaults=self.CONFIG_DEFAULTS) - try: - reader.read(value) - print "Use regex file : " + value - self.__failregex = [RegexStat(m) - for m in reader.get("Definition", "failregex").split('\n')] - except NoSectionError: - print "No [Definition] section in " + value - print - return False - except NoOptionError: - print "No failregex option in " + value - print - return False - except MissingSectionHeaderError: - print "No section headers in " + value - print - return False - else: - if len(value) > 53: - stripReg = value[0:50] + "..." - else: - stripReg = value - print "Use regex line : " + stripReg - self.__failregex = [RegexStat(value)] + setattr(self, "_" + regex, regex_values) return True def testIgnoreRegex(self, line): found = False - for regex in self.__ignoreregex: - logging.getLogger("fail2ban").setLevel(self.__logging_level) + for regex in self._ignoreregex: + logging.getLogger("fail2ban").setLevel(self._logging_level) try: - self.__filter.addIgnoreRegex(regex.getFailRegex()) + self._filter.addIgnoreRegex(regex.getFailRegex()) try: - ret = self.__filter.ignoreLine(line) + ret = self._filter.ignoreLine(line) if ret: + found = True regex.inc() except RegexException, e: print e return False finally: - self.__filter.delIgnoreRegex(0) - logging.getLogger("fail2ban").setLevel(self.__logging_level) + self._filter.delIgnoreRegex(0) + logging.getLogger("fail2ban").setLevel(self._logging_level) + return found def testRegex(self, line): found = False - for regex in self.__ignoreregex: - self.__filter.addIgnoreRegex(regex.getFailRegex()) - for regex in self.__failregex: - logging.getLogger("fail2ban").setLevel(logging.DEBUG) + for regex in self._ignoreregex: + self._filter.addIgnoreRegex(regex.getFailRegex()) + for regex in self._failregex: + # logging.getLogger("fail2ban").setLevel(logging.DEBUG) try: - self.__filter.addFailRegex(regex.getFailRegex()) + self._filter.addFailRegex(regex.getFailRegex()) try: - ret = self.__filter.processLine(line) - if not len(ret) == 0: + ret = self._filter.processLine(line) + if len(ret): if found == True: ret[0].append(True) else: @@ -247,16 +254,47 @@ class Fail2banRegex: print "Sorry, but no found in regex" return False finally: - self.__filter.delFailRegex(0) + self._filter.delFailRegex(0) logging.getLogger("fail2ban").setLevel(logging.CRITICAL) - for regex in self.__ignoreregex: - self.__filter.delIgnoreRegex(0) + for regex in self._ignoreregex: + self._filter.delIgnoreRegex(0) + return found + + + def process(self, test_lines): + + for line in test_lines: + if line.startswith('# ') or not line.strip(): + # skip comment and empty lines + continue + is_ignored = fail2banRegex.testIgnoreRegex(line) + if is_ignored: + self._line_stats.ignored_lines.append(line) + + if fail2banRegex.testRegex(line): + assert(not is_ignored) + self._line_stats.matched += 1 + else: + if not is_ignored: + self._line_stats.missed_lines.append(line) + self._line_stats.tested += 1 + + def printLines(self, ltype): + lstats = self._line_stats + assert(len(lstats.missed_lines) == lstats.tested - (lstats.matched + lstats.ignored)) + l = lstats[ltype + '_lines'] + if len(l): + header = "%s line(s):" % (ltype.capitalize(),) + if len(l) < 20 or getattr(self, '_print_all_' + ltype): + pprint_list([x.rstrip() for x in l], header) + else: + print "%s: too many to print. Use --print-all-%s " \ + "to print all %d lines" % (header, ltype, len(l)) def printStats(self): print print "Results" print "=======" - print def print_failregexes(title, failregexes): # Print title @@ -264,106 +302,78 @@ class Fail2banRegex: for cnt, failregex in enumerate(failregexes): match = failregex.getStats() total += match - if (match or self.__verbose): - out.append("| %d) [%d] %s" % (cnt+1, match, failregex.getFailRegex())) - print "%s: %d total" % (title, total) - if len(out): - print "|- #) [# of hits] regular expression" - print '\n'.join(out) - print '`-' - print - return total + if (match or self._verbose): + out.append("%2d) [%d] %s" % (cnt+1, match, failregex.getFailRegex())) - # Print title - total = print_failregexes("Failregex", self.__failregex) - _ = print_failregexes("Ignoreregex", self.__ignoreregex) - - print "Summary" - print "=======" - print - - if total == 0: - print "Sorry, no match" - print - print "Look at the above section 'Running tests' which could contain important" - print "information." - return False - else: - # Print stats - print "Addresses found:" - for cnt, failregex in enumerate(self.__failregex): - if self.__verbose or len(failregex.getIPList()): - print "[%d]" % (cnt+1) + if self._verbose and len(failregex.getIPList()): for ip in failregex.getIPList(): timeTuple = time.localtime(ip[1]) timeString = time.strftime("%a %b %d %H:%M:%S %Y", timeTuple) - print " %s (%s)%s" % ( - ip[0], timeString, ip[2] and " (already matched)" or "") - print + out.append(" %s %s%s" % ( + ip[0], timeString, ip[2] and " (already matched)" or "")) - print "Date template hits:" - for template in self.__filter.dateDetector.getTemplates(): - if self.__verbose or template.getHits(): - print `template.getHits()` + " hit(s): " + template.getName() - print + print "\n%s: %d total" % (title, total) + pprint_list(out, " #) [# of hits] regular expression") + return total - print "Success, the total number of match is " + str(total) - print - print "However, look at the above section 'Running tests' which could contain important" - print "information." - return True + # Print title + total = print_failregexes("Failregex", self._failregex) + _ = print_failregexes("Ignoreregex", self._ignoreregex) + + + print "\nDate template hits:" + out = [] + for template in self._filter.dateDetector.getTemplates(): + if self._verbose or template.getHits(): + out.append("[%d] %s" % (template.getHits(), template.getName())) + pprint_list(out, "[# of hits] date format") + + print "\nLines: %s" % self._line_stats + + self.printLines('ignored') + self.printLines('missed') + + return True if __name__ == "__main__": - fail2banRegex = Fail2banRegex() - # Reads the command line options. - try: - cmdOpts = 'hVcv' - cmdLongOpts = ['help', 'version', 'verbose'] - optList, args = getopt.getopt(sys.argv[1:], cmdOpts, cmdLongOpts) - except getopt.GetoptError: - fail2banRegex.dispUsage() - sys.exit(-1) - # Process command line - fail2banRegex.getCmdLineOptions(optList) + + parser = get_opt_parser() + (opts, args) = parser.parse_args() + + fail2banRegex = Fail2banRegex(opts) # We need 2 or 3 parameters if not len(args) in (2, 3): - fail2banRegex.dispUsage() + sys.stderr.write("ERROR: provide both and .\n\n") + parser.print_help() sys.exit(-1) + + print + print "Running tests" + print "=============" + print + + cmd_log, cmd_regex = args[:2] + + if len(args) == 3: + fail2banRegex.readRegex(args[2], 'ignore') or sys.exit(-1) + + fail2banRegex.readRegex(cmd_regex, 'fail') or sys.exit(-1) + + if os.path.isfile(cmd_log): + try: + hdlr = open(cmd_log) + print "Use log file : %s" % cmd_log + test_lines = hdlr.readlines() + except IOError, e: + print e + sys.exit(-1) else: - print - print "Running tests" - print "=============" - print + print "Use single line : %s" % shortstr(cmd_log) + test_lines = [ cmd_log ] + print - cmd_log, cmd_regex = args[:2] + fail2banRegex.process(test_lines) - if len(args) == 3: - fail2banRegex.readIgnoreRegex(args[2]) or sys.exit(-1) - - fail2banRegex.readRegex(cmd_regex) or sys.exit(-1) - - if fail2banRegex.logIsFile(cmd_log): - try: - hdlr = open(cmd_log) - print "Use log file : " + cmd_log - print - for line in hdlr: - fail2banRegex.testIgnoreRegex(line) - fail2banRegex.testRegex(line) - except IOError, e: - print e - print - sys.exit(-1) - else: - if len(sys.argv[1]) > 53: - stripLog = cmd_log[0:50] + "..." - else: - stripLog = cmd_log - print "Use single line: " + stripLog - print - fail2banRegex.testIgnoreRegex(cmd_log) - fail2banRegex.testRegex(cmd_log) - - fail2banRegex.printStats() or sys.exit(-1) + fail2banRegex.printStats() or sys.exit(-1)