* 'master' of https://github.com/yarikoptic/fail2ban:
  DOC: Changelog for fail2ban-regex RF
  ENH: fail2ban-regex -- add specification of loglevels to enable
  RF: reworked -regex cmdline tool to use optparse, some unification and enhancement of outputs
  ENH: 'heavydebug' level == 5 for even more debugging in tricky cases

Conflicts:
	ChangeLog
pull/265/merge
Yaroslav Halchenko 2013-06-15 10:52:05 -04:00
commit 1ab0f0f9e3
5 changed files with 273 additions and 234 deletions

View File

@ -22,6 +22,9 @@ ver. 0.8.11 (2013/XX/XXX) - wanna-be-released
Daniel Black & Georgiy Mernov
* filter.d/exim.conf -- regex hardening and extra failure examples in
sample logs
Yaroslav Halchenko
* fail2ban-regex -- refactored to provide more details (missing and
ignored lines, control over logging, etc) while maintaining look&feel.
ver. 0.8.10 (2013/06/12) - wanna-be-secure
-----------

View File

@ -23,3 +23,8 @@
__author__ = "Cyril Jaquier"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import logging
# Custom debug level
logging.HEAVYDEBUG = 5

View File

@ -17,9 +17,17 @@
# You should have received a copy of the GNU General Public License
# along with Fail2Ban; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
Fail2Ban reads log file that contains password failure report
and bans the corresponding IP addresses using firewall rules.
This tools can test regular expressions for "fail2ban".
Report bugs to https://github.com/fail2ban/fail2ban/issues
"""
__author__ = "Cyril Jaquier, Yaroslav Halchenko"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier, 2012 Yaroslav Halchenko"
__copyright__ = "Copyright (c) 2004-2008 Cyril Jaquier, 2012-2013 Yaroslav Halchenko"
__license__ = "GPL"
import getopt, sys, time, logging, os
@ -32,207 +40,194 @@ except ImportError, e:
sys.path.insert(1, "/usr/share/fail2ban")
from common.version import version
from optparse import OptionParser, Option
from client.configparserinc import SafeConfigParserWithIncludes
from ConfigParser import NoOptionError, NoSectionError, MissingSectionHeaderError
from server.filter import Filter
from server.failregex import RegexException
from testcases.utils import FormatterWithTraceBack
# Gets the instance of the logger.
logSys = logging.getLogger("fail2ban.regex")
logSys = logging.getLogger("fail2ban")
class RegexStat:
def shortstr(s, l=53):
"""Return shortened string
"""
if len(s) > l:
return s[:l-3] + '...'
return s
def pprint_list(l, header=None):
if not len(l):
return
if header:
s = "|- %s\n" % header
else:
s = ''
print s + "| " + "\n| ".join(l) + '\n`-'
def get_opt_parser():
# use module docstring for help output
p = OptionParser(
usage="%s [OPTIONS] <LOG> <REGEX> [IGNOREREGEX]\n" % sys.argv[0] + __doc__
+ """
LOG:
string a string representing a log line
filename path to a log file (/var/log/auth.log)
REGEX:
string a string representing a 'failregex'
filename path to a filter file (filter.d/sshd.conf)
IGNOREREGEX:
string a string representing an 'ignoreregex'
filename path to a filter file (filter.d/sshd.conf)
""",
version="%prog " + version)
p.add_options([
Option('-l', "--log-level", type="choice",
dest="log_level",
choices=('heavydebug', 'debug', 'info', 'warning', 'error', 'fatal'),
default=None,
help="Log level for the Fail2Ban logger to use"),
Option("-v", "--verbose", action='store_true',
help="Be verbose in output"),
Option("--print-all-missed", action='store_true',
help="Either to print all missed lines"),
Option("--print-all-ignored", action='store_true',
help="Either to print all ignored lines"),
Option("-t", "--log-traceback", action='store_true',
help="Enrich log-messages with compressed tracebacks"),
Option("--full-traceback", action='store_true',
help="Either to make the tracebacks full, not compressed (as by default)"),
])
return p
class RegexStat(object):
def __init__(self, failregex):
self.__stats = 0
self.__failregex = failregex
self.__ipList = list()
self._stats = 0
self._failregex = failregex
self._ipList = list()
def __str__(self):
return "%s(%r) %d failed: %s" \
% (self.__class__, self.__failregex, self.__stats, self.__ipList)
% (self.__class__, self._failregex, self._stats, self._ipList)
def inc(self):
self.__stats += 1
self._stats += 1
def getStats(self):
return self.__stats
return self._stats
def getFailRegex(self):
return self.__failregex
return self._failregex
def appendIP(self, value):
self.__ipList.extend(value)
self._ipList.extend(value)
def getIPList(self):
return self.__ipList
return self._ipList
class Fail2banRegex:
test = None
class LineStats(object):
"""Just a convenience container for stats
"""
def __init__(self):
self.tested = self.matched = 0
self.missed_lines = []
self.ignored_lines = []
def __str__(self):
return "%(tested)d lines, %(ignored)d ignored, %(matched)d matched, %(missed)d missed" % self
@property
def ignored(self):
return len(self.ignored_lines)
@property
def missed(self):
return self.tested - (self.ignored + self.matched)
# just for convenient str
def __getitem__(self, key):
return getattr(self, key)
class Fail2banRegex(object):
CONFIG_DEFAULTS = {'configpath' : "/etc/fail2ban/"}
def __init__(self):
self.__filter = Filter(None)
self.__ignoreregex = list()
self.__failregex = list()
self.__verbose = False
# Setup logging
logging.getLogger("fail2ban").handlers = []
self.__hdlr = logging.StreamHandler(Fail2banRegex.test)
# set a format which is simpler for console use
formatter = logging.Formatter("%(message)s")
# tell the handler to use this format
self.__hdlr.setFormatter(formatter)
self.__logging_level = self.__verbose and logging.DEBUG or logging.WARN
logging.getLogger("fail2ban").addHandler(self.__hdlr)
logging.getLogger("fail2ban").setLevel(logging.ERROR)
def __init__(self, opts):
self._verbose = opts.verbose
self._print_all_missed = opts.print_all_missed
self._print_all_ignored = opts.print_all_ignored
#@staticmethod
def dispVersion():
print "Fail2Ban v" + version
print
print "Copyright (c) 2004-2008 Cyril Jaquier"
print "Copyright of modifications held by their respective authors."
print "Licensed under the GNU General Public License v2 (GPL)."
print
print "Written by Cyril Jaquier <cyril.jaquier@fail2ban.org>."
print "Many contributions by Yaroslav O. Halchenko <debian@onerussian.com>."
dispVersion = staticmethod(dispVersion)
self._filter = Filter(None)
self._ignoreregex = list()
self._failregex = list()
self._line_stats = LineStats()
#@staticmethod
def dispUsage():
print "Usage: "+sys.argv[0]+" [OPTIONS] <LOG> <REGEX> [IGNOREREGEX]"
print
print "Fail2Ban v" + version + " reads log file that contains password failure report"
print "and bans the corresponding IP addresses using firewall rules."
print
print "This tools can test regular expressions for \"fail2ban\"."
print
print "Options:"
print " -h, --help display this help message"
print " -V, --version print the version"
print " -v, --verbose verbose output"
print
print "Log:"
print " string a string representing a log line"
print " filename path to a log file (/var/log/auth.log)"
print
print "Regex:"
print " string a string representing a 'failregex'"
print " filename path to a filter file (filter.d/sshd.conf)"
print
print "IgnoreRegex:"
print " string a string representing an 'ignoreregex'"
print " filename path to a filter file (filter.d/sshd.conf)"
print
print "Report bugs to https://github.com/fail2ban/fail2ban/issues"
dispUsage = staticmethod(dispUsage)
def getCmdLineOptions(self, optList):
""" Gets the command line options
"""
for opt in optList:
if opt[0] in ["-h", "--help"]:
self.dispUsage()
sys.exit(0)
elif opt[0] in ["-V", "--version"]:
self.dispVersion()
sys.exit(0)
elif opt[0] in ["-v", "--verbose"]:
self.__verbose = True
#@staticmethod
def logIsFile(value):
return os.path.isfile(value)
logIsFile = staticmethod(logIsFile)
def readIgnoreRegex(self, value):
def readRegex(self, value, regextype):
assert(regextype in ('fail', 'ignore'))
regex = regextype + 'regex'
if os.path.isfile(value):
reader = SafeConfigParserWithIncludes(defaults=self.CONFIG_DEFAULTS)
try:
reader.read(value)
print "Use ignoreregex file : " + value
self.__ignoreregex = [RegexStat(m)
for m in reader.get("Definition", "ignoreregex").split('\n')]
print "Use %11s file : %s" % (regex, value)
# TODO: reuse functionality in client
regex_values = [RegexStat(m)
for m in reader.get("Definition", regex).split('\n')]
except NoSectionError:
print "No [Definition] section in " + value
print
print "No [Definition] section in %s" % value
return False
except NoOptionError:
print "No failregex option in " + value
print
print "No %s option in %s" % (regex, value)
return False
except MissingSectionHeaderError:
print "No section headers in " + value
print
print "No section headers in %s" % value
return False
else:
if len(value) > 53:
stripReg = value[0:50] + "..."
else:
stripReg = value
print "Use ignoreregex line : " + stripReg
self.__ignoreregex = [RegexStat(value)]
return True
print "Use %11s line : %s" % (regex, shortstr(value))
regex_values = [RegexStat(value)]
def readRegex(self, value):
if os.path.isfile(value):
reader = SafeConfigParserWithIncludes(defaults=self.CONFIG_DEFAULTS)
try:
reader.read(value)
print "Use regex file : " + value
self.__failregex = [RegexStat(m)
for m in reader.get("Definition", "failregex").split('\n')]
except NoSectionError:
print "No [Definition] section in " + value
print
return False
except NoOptionError:
print "No failregex option in " + value
print
return False
except MissingSectionHeaderError:
print "No section headers in " + value
print
return False
else:
if len(value) > 53:
stripReg = value[0:50] + "..."
else:
stripReg = value
print "Use regex line : " + stripReg
self.__failregex = [RegexStat(value)]
setattr(self, "_" + regex, regex_values)
return True
def testIgnoreRegex(self, line):
found = False
for regex in self.__ignoreregex:
logging.getLogger("fail2ban").setLevel(self.__logging_level)
for regex in self._ignoreregex:
try:
self.__filter.addIgnoreRegex(regex.getFailRegex())
self._filter.addIgnoreRegex(regex.getFailRegex())
try:
ret = self.__filter.ignoreLine(line)
ret = self._filter.ignoreLine(line)
if ret:
found = True
regex.inc()
except RegexException, e:
print e
return False
finally:
self.__filter.delIgnoreRegex(0)
logging.getLogger("fail2ban").setLevel(self.__logging_level)
self._filter.delIgnoreRegex(0)
return found
def testRegex(self, line):
found = False
for regex in self.__ignoreregex:
self.__filter.addIgnoreRegex(regex.getFailRegex())
for regex in self.__failregex:
logging.getLogger("fail2ban").setLevel(logging.DEBUG)
for regex in self._ignoreregex:
self._filter.addIgnoreRegex(regex.getFailRegex())
for regex in self._failregex:
try:
self.__filter.addFailRegex(regex.getFailRegex())
self._filter.addFailRegex(regex.getFailRegex())
try:
ret = self.__filter.processLine(line)
if not len(ret) == 0:
ret = self._filter.processLine(line)
if len(ret):
if found == True:
ret[0].append(True)
else:
@ -247,16 +242,46 @@ class Fail2banRegex:
print "Sorry, but no <host> found in regex"
return False
finally:
self.__filter.delFailRegex(0)
logging.getLogger("fail2ban").setLevel(logging.CRITICAL)
for regex in self.__ignoreregex:
self.__filter.delIgnoreRegex(0)
self._filter.delFailRegex(0)
for regex in self._ignoreregex:
self._filter.delIgnoreRegex(0)
return found
def process(self, test_lines):
for line in test_lines:
if line.startswith('# ') or not line.strip():
# skip comment and empty lines
continue
is_ignored = fail2banRegex.testIgnoreRegex(line)
if is_ignored:
self._line_stats.ignored_lines.append(line)
if fail2banRegex.testRegex(line):
assert(not is_ignored)
self._line_stats.matched += 1
else:
if not is_ignored:
self._line_stats.missed_lines.append(line)
self._line_stats.tested += 1
def printLines(self, ltype):
lstats = self._line_stats
assert(len(lstats.missed_lines) == lstats.tested - (lstats.matched + lstats.ignored))
l = lstats[ltype + '_lines']
if len(l):
header = "%s line(s):" % (ltype.capitalize(),)
if len(l) < 20 or getattr(self, '_print_all_' + ltype):
pprint_list([x.rstrip() for x in l], header)
else:
print "%s: too many to print. Use --print-all-%s " \
"to print all %d lines" % (header, ltype, len(l))
def printStats(self):
print
print "Results"
print "======="
print
def print_failregexes(title, failregexes):
# Print title
@ -264,106 +289,107 @@ class Fail2banRegex:
for cnt, failregex in enumerate(failregexes):
match = failregex.getStats()
total += match
if (match or self.__verbose):
out.append("| %d) [%d] %s" % (cnt+1, match, failregex.getFailRegex()))
print "%s: %d total" % (title, total)
if len(out):
print "|- #) [# of hits] regular expression"
print '\n'.join(out)
print '`-'
print
return total
if (match or self._verbose):
out.append("%2d) [%d] %s" % (cnt+1, match, failregex.getFailRegex()))
# Print title
total = print_failregexes("Failregex", self.__failregex)
_ = print_failregexes("Ignoreregex", self.__ignoreregex)
print "Summary"
print "======="
print
if total == 0:
print "Sorry, no match"
print
print "Look at the above section 'Running tests' which could contain important"
print "information."
return False
else:
# Print stats
print "Addresses found:"
for cnt, failregex in enumerate(self.__failregex):
if self.__verbose or len(failregex.getIPList()):
print "[%d]" % (cnt+1)
if self._verbose and len(failregex.getIPList()):
for ip in failregex.getIPList():
timeTuple = time.localtime(ip[1])
timeString = time.strftime("%a %b %d %H:%M:%S %Y", timeTuple)
print " %s (%s)%s" % (
ip[0], timeString, ip[2] and " (already matched)" or "")
print
out.append(" %s %s%s" % (
ip[0], timeString, ip[2] and " (already matched)" or ""))
print "Date template hits:"
for template in self.__filter.dateDetector.getTemplates():
if self.__verbose or template.getHits():
print `template.getHits()` + " hit(s): " + template.getName()
print
print "\n%s: %d total" % (title, total)
pprint_list(out, " #) [# of hits] regular expression")
return total
print "Success, the total number of match is " + str(total)
print
print "However, look at the above section 'Running tests' which could contain important"
print "information."
return True
# Print title
total = print_failregexes("Failregex", self._failregex)
_ = print_failregexes("Ignoreregex", self._ignoreregex)
print "\nDate template hits:"
out = []
for template in self._filter.dateDetector.getTemplates():
if self._verbose or template.getHits():
out.append("[%d] %s" % (template.getHits(), template.getName()))
pprint_list(out, "[# of hits] date format")
print "\nLines: %s" % self._line_stats
self.printLines('ignored')
self.printLines('missed')
return True
if __name__ == "__main__":
fail2banRegex = Fail2banRegex()
# Reads the command line options.
try:
cmdOpts = 'hVcv'
cmdLongOpts = ['help', 'version', 'verbose']
optList, args = getopt.getopt(sys.argv[1:], cmdOpts, cmdLongOpts)
except getopt.GetoptError:
fail2banRegex.dispUsage()
sys.exit(-1)
# Process command line
fail2banRegex.getCmdLineOptions(optList)
parser = get_opt_parser()
(opts, args) = parser.parse_args()
fail2banRegex = Fail2banRegex(opts)
# We need 2 or 3 parameters
if not len(args) in (2, 3):
fail2banRegex.dispUsage()
sys.stderr.write("ERROR: provide both <LOG> and <REGEX>.\n\n")
parser.print_help()
sys.exit(-1)
# TODO: taken from -testcases -- move common functionality somewhere
if opts.log_level is not None: # pragma: no cover
# so we had explicit settings
logSys.setLevel(getattr(logging, opts.log_level.upper()))
else: # pragma: no cover
# suppress the logging but it would leave unittests' progress dots
# ticking, unless like with '-l fatal' which would be silent
# unless error occurs
logSys.setLevel(getattr(logging, 'FATAL'))
# Add the default logging handler
stdout = logging.StreamHandler(sys.stdout)
fmt = 'D: %(message)s'
if opts.log_traceback:
Formatter = FormatterWithTraceBack
fmt = (opts.full_traceback and ' %(tb)s' or ' %(tbc)s') + fmt
else:
print
print "Running tests"
print "============="
print
Formatter = logging.Formatter
cmd_log, cmd_regex = args[:2]
# Custom log format for the verbose tests runs
if opts.verbose > 1: # pragma: no cover
stdout.setFormatter(Formatter(' %(asctime)-15s %(thread)s' + fmt))
else: # pragma: no cover
# just prefix with the space
stdout.setFormatter(Formatter(fmt))
logSys.addHandler(stdout)
if len(args) == 3:
fail2banRegex.readIgnoreRegex(args[2]) or sys.exit(-1)
print
print "Running tests"
print "============="
print
fail2banRegex.readRegex(cmd_regex) or sys.exit(-1)
cmd_log, cmd_regex = args[:2]
if fail2banRegex.logIsFile(cmd_log):
try:
hdlr = open(cmd_log)
print "Use log file : " + cmd_log
print
for line in hdlr:
fail2banRegex.testIgnoreRegex(line)
fail2banRegex.testRegex(line)
except IOError, e:
print e
print
sys.exit(-1)
else:
if len(sys.argv[1]) > 53:
stripLog = cmd_log[0:50] + "..."
else:
stripLog = cmd_log
print "Use single line: " + stripLog
print
fail2banRegex.testIgnoreRegex(cmd_log)
fail2banRegex.testRegex(cmd_log)
if len(args) == 3:
fail2banRegex.readRegex(args[2], 'ignore') or sys.exit(-1)
fail2banRegex.printStats() or sys.exit(-1)
fail2banRegex.readRegex(cmd_regex, 'fail') or sys.exit(-1)
if os.path.isfile(cmd_log):
try:
hdlr = open(cmd_log)
print "Use log file : %s" % cmd_log
test_lines = hdlr.readlines()
except IOError, e:
print e
sys.exit(-1)
else:
print "Use single line : %s" % shortstr(cmd_log)
test_lines = [ cmd_log ]
print
fail2banRegex.process(test_lines)
fail2banRegex.printStats() or sys.exit(-1)

View File

@ -52,7 +52,7 @@ def get_opt_parser():
p.add_options([
Option('-l', "--log-level", type="choice",
dest="log_level",
choices=('debug', 'info', 'warn', 'error', 'fatal'),
choices=('heavydebug', 'debug', 'info', 'warning', 'error', 'fatal'),
default=None,
help="Log level for the logger to use during running tests"),
Option('-n', "--no-network", action="store_true",
@ -76,7 +76,8 @@ parser = get_opt_parser()
logSys = logging.getLogger("fail2ban")
# Numerical level of verbosity corresponding to a log "level"
verbosity = {'debug': 3,
verbosity = {'heavydebug': 4,
'debug': 3,
'info': 2,
'warn': 1,
'error': 1,

View File

@ -291,6 +291,8 @@ class Filter(JailThread):
except UnicodeDecodeError:
l = line
l = l.rstrip('\r\n')
logSys.log(5, "Working on line %r", l)
timeMatch = self.dateDetector.matchTime(l)
if timeMatch:
# Lets split into time part and log part of the line
@ -355,6 +357,8 @@ class Filter(JailThread):
if failRegex.hasMatched():
# The failregex matched.
date = self.dateDetector.getUnixTime(timeLine)
logSys.log(7, "Date: %r, message: %r",
timeLine, logLine)
if date is None:
logSys.debug("Found a match for %r but no valid date/time "
"found for %r. Please file a detailed issue on"