mirror of https://github.com/fail2ban/fail2ban
Merge pull request #1250 from sebres/_sb/fail2ban-regex-coverage
RF: fail2ban-regex code moved into client/ codebase, unittestedpull/1251/head
commit
aa0588dd1a
|
@ -29,569 +29,6 @@ __author__ = "Fail2Ban Developers"
|
||||||
__copyright__ = "Copyright (c) 2004-2008 Cyril Jaquier, 2012-2014 Yaroslav Halchenko"
|
__copyright__ = "Copyright (c) 2004-2008 Cyril Jaquier, 2012-2014 Yaroslav Halchenko"
|
||||||
__license__ = "GPL"
|
__license__ = "GPL"
|
||||||
|
|
||||||
import getopt
|
from fail2ban.client.fail2banregex import exec_command_line
|
||||||
import locale
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import shlex
|
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
import time
|
|
||||||
import urllib
|
|
||||||
from optparse import OptionParser, Option
|
|
||||||
|
|
||||||
from ConfigParser import NoOptionError, NoSectionError, MissingSectionHeaderError
|
exec_command_line()
|
||||||
|
|
||||||
try:
|
|
||||||
from systemd import journal
|
|
||||||
from fail2ban.server.filtersystemd import FilterSystemd
|
|
||||||
except ImportError:
|
|
||||||
journal = None
|
|
||||||
|
|
||||||
from fail2ban.version import version
|
|
||||||
from fail2ban.client.filterreader import FilterReader
|
|
||||||
from fail2ban.server.filter import Filter
|
|
||||||
from fail2ban.server.failregex import RegexException
|
|
||||||
|
|
||||||
from fail2ban.helpers import FormatterWithTraceBack, getLogger
|
|
||||||
# Gets the instance of the logger.
|
|
||||||
logSys = getLogger("fail2ban")
|
|
||||||
|
|
||||||
def debuggexURL(sample, regex):
|
|
||||||
q = urllib.urlencode({ 're': regex.replace('<HOST>', '(?&.ipv4)'),
|
|
||||||
'str': sample,
|
|
||||||
'flavor': 'python' })
|
|
||||||
return 'http://www.debuggex.com/?' + q
|
|
||||||
|
|
||||||
def shortstr(s, l=53):
|
|
||||||
"""Return shortened string
|
|
||||||
"""
|
|
||||||
if len(s) > l:
|
|
||||||
return s[:l-3] + '...'
|
|
||||||
return s
|
|
||||||
|
|
||||||
def pprint_list(l, header=None):
|
|
||||||
if not len(l):
|
|
||||||
return
|
|
||||||
if header:
|
|
||||||
s = "|- %s\n" % header
|
|
||||||
else:
|
|
||||||
s = ''
|
|
||||||
print s + "| " + "\n| ".join(l) + '\n`-'
|
|
||||||
|
|
||||||
def file_lines_gen(hdlr):
|
|
||||||
for line in hdlr:
|
|
||||||
try:
|
|
||||||
line = line.decode(fail2banRegex.encoding, 'strict')
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
logSys.warning(
|
|
||||||
"Error decoding line from '%s' with '%s'."
|
|
||||||
" Consider setting logencoding=utf-8 (or another appropriate"
|
|
||||||
" encoding) for this jail. Continuing"
|
|
||||||
" to process line ignoring invalid characters: %r" %
|
|
||||||
('<LOG>', fail2banRegex.encoding, line))
|
|
||||||
# decode with replacing error chars:
|
|
||||||
line = line.decode(fail2banRegex.encoding, 'replace')
|
|
||||||
yield line
|
|
||||||
|
|
||||||
def journal_lines_gen(myjournal):
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
entry = myjournal.get_next()
|
|
||||||
except OSError:
|
|
||||||
continue
|
|
||||||
if not entry:
|
|
||||||
break
|
|
||||||
yield FilterSystemd.formatJournalEntry(entry)
|
|
||||||
|
|
||||||
def get_opt_parser():
|
|
||||||
# use module docstring for help output
|
|
||||||
p = OptionParser(
|
|
||||||
usage="%s [OPTIONS] <LOG> <REGEX> [IGNOREREGEX]\n" % sys.argv[0] + __doc__
|
|
||||||
+ """
|
|
||||||
LOG:
|
|
||||||
string a string representing a log line
|
|
||||||
filename path to a log file (/var/log/auth.log)
|
|
||||||
"systemd-journal" search systemd journal (systemd-python required)
|
|
||||||
|
|
||||||
REGEX:
|
|
||||||
string a string representing a 'failregex'
|
|
||||||
filename path to a filter file (filter.d/sshd.conf)
|
|
||||||
|
|
||||||
IGNOREREGEX:
|
|
||||||
string a string representing an 'ignoreregex'
|
|
||||||
filename path to a filter file (filter.d/sshd.conf)
|
|
||||||
|
|
||||||
Copyright (c) 2004-2008 Cyril Jaquier, 2008- Fail2Ban Contributors
|
|
||||||
Copyright of modifications held by their respective authors.
|
|
||||||
Licensed under the GNU General Public License v2 (GPL).
|
|
||||||
|
|
||||||
Written by Cyril Jaquier <cyril.jaquier@fail2ban.org>.
|
|
||||||
Many contributions by Yaroslav O. Halchenko and Steven Hiscocks.
|
|
||||||
|
|
||||||
Report bugs to https://github.com/fail2ban/fail2ban/issues
|
|
||||||
""",
|
|
||||||
version="%prog " + version)
|
|
||||||
|
|
||||||
p.add_options([
|
|
||||||
Option("-d", "--datepattern",
|
|
||||||
help="set custom pattern used to match date/times"),
|
|
||||||
Option("-e", "--encoding",
|
|
||||||
help="File encoding. Default: system locale"),
|
|
||||||
Option("-L", "--maxlines", type=int, default=0,
|
|
||||||
help="maxlines for multi-line regex"),
|
|
||||||
Option("-m", "--journalmatch",
|
|
||||||
help="journalctl style matches overriding filter file. "
|
|
||||||
"\"systemd-journal\" only"),
|
|
||||||
Option('-l', "--log-level", type="choice",
|
|
||||||
dest="log_level",
|
|
||||||
choices=('heavydebug', 'debug', 'info', 'notice', 'warning', 'error', 'critical'),
|
|
||||||
default=None,
|
|
||||||
help="Log level for the Fail2Ban logger to use"),
|
|
||||||
Option("-v", "--verbose", action='store_true',
|
|
||||||
help="Be verbose in output"),
|
|
||||||
Option("-D", "--debuggex", action='store_true',
|
|
||||||
help="Produce debuggex.com urls for debugging there"),
|
|
||||||
Option("--print-no-missed", action='store_true',
|
|
||||||
help="Do not print any missed lines"),
|
|
||||||
Option("--print-no-ignored", action='store_true',
|
|
||||||
help="Do not print any ignored lines"),
|
|
||||||
Option("--print-all-matched", action='store_true',
|
|
||||||
help="Print all matched lines"),
|
|
||||||
Option("--print-all-missed", action='store_true',
|
|
||||||
help="Print all missed lines, no matter how many"),
|
|
||||||
Option("--print-all-ignored", action='store_true',
|
|
||||||
help="Print all ignored lines, no matter how many"),
|
|
||||||
Option("-t", "--log-traceback", action='store_true',
|
|
||||||
help="Enrich log-messages with compressed tracebacks"),
|
|
||||||
Option("--full-traceback", action='store_true',
|
|
||||||
help="Either to make the tracebacks full, not compressed (as by default)"),
|
|
||||||
])
|
|
||||||
|
|
||||||
return p
|
|
||||||
|
|
||||||
|
|
||||||
class RegexStat(object):
|
|
||||||
|
|
||||||
def __init__(self, failregex):
|
|
||||||
self._stats = 0
|
|
||||||
self._failregex = failregex
|
|
||||||
self._ipList = list()
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return "%s(%r) %d failed: %s" \
|
|
||||||
% (self.__class__, self._failregex, self._stats, self._ipList)
|
|
||||||
|
|
||||||
def inc(self):
|
|
||||||
self._stats += 1
|
|
||||||
|
|
||||||
def getStats(self):
|
|
||||||
return self._stats
|
|
||||||
|
|
||||||
def getFailRegex(self):
|
|
||||||
return self._failregex
|
|
||||||
|
|
||||||
def appendIP(self, value):
|
|
||||||
self._ipList.append(value)
|
|
||||||
|
|
||||||
def getIPList(self):
|
|
||||||
return self._ipList
|
|
||||||
|
|
||||||
|
|
||||||
class LineStats(object):
|
|
||||||
"""Just a convenience container for stats
|
|
||||||
"""
|
|
||||||
def __init__(self):
|
|
||||||
self.tested = self.matched = 0
|
|
||||||
self.matched_lines = []
|
|
||||||
self.missed = 0
|
|
||||||
self.missed_lines = []
|
|
||||||
self.missed_lines_timeextracted = []
|
|
||||||
self.ignored = 0
|
|
||||||
self.ignored_lines = []
|
|
||||||
self.ignored_lines_timeextracted = []
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return "%(tested)d lines, %(ignored)d ignored, %(matched)d matched, %(missed)d missed" % self
|
|
||||||
|
|
||||||
# just for convenient str
|
|
||||||
def __getitem__(self, key):
|
|
||||||
return getattr(self, key)
|
|
||||||
|
|
||||||
|
|
||||||
class Fail2banRegex(object):
|
|
||||||
|
|
||||||
def __init__(self, opts):
|
|
||||||
self._verbose = opts.verbose
|
|
||||||
self._debuggex = opts.debuggex
|
|
||||||
self._maxlines = 20
|
|
||||||
self._print_no_missed = opts.print_no_missed
|
|
||||||
self._print_no_ignored = opts.print_no_ignored
|
|
||||||
self._print_all_matched = opts.print_all_matched
|
|
||||||
self._print_all_missed = opts.print_all_missed
|
|
||||||
self._print_all_ignored = opts.print_all_ignored
|
|
||||||
self._maxlines_set = False # so we allow to override maxlines in cmdline
|
|
||||||
self._datepattern_set = False
|
|
||||||
self._journalmatch = None
|
|
||||||
|
|
||||||
self.share_config=dict()
|
|
||||||
self._filter = Filter(None)
|
|
||||||
self._ignoreregex = list()
|
|
||||||
self._failregex = list()
|
|
||||||
self._time_elapsed = None
|
|
||||||
self._line_stats = LineStats()
|
|
||||||
|
|
||||||
if opts.maxlines:
|
|
||||||
self.setMaxLines(opts.maxlines)
|
|
||||||
if opts.journalmatch is not None:
|
|
||||||
self.setJournalMatch(opts.journalmatch.split())
|
|
||||||
if opts.datepattern:
|
|
||||||
self.setDatePattern(opts.datepattern)
|
|
||||||
if opts.encoding:
|
|
||||||
self.encoding = opts.encoding
|
|
||||||
else:
|
|
||||||
self.encoding = locale.getpreferredencoding()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def setDatePattern(self, pattern):
|
|
||||||
if not self._datepattern_set:
|
|
||||||
self._filter.setDatePattern(pattern)
|
|
||||||
self._datepattern_set = True
|
|
||||||
if pattern is not None:
|
|
||||||
print "Use datepattern : %s" % (
|
|
||||||
self._filter.getDatePattern()[1], )
|
|
||||||
|
|
||||||
def setMaxLines(self, v):
|
|
||||||
if not self._maxlines_set:
|
|
||||||
self._filter.setMaxLines(int(v))
|
|
||||||
self._maxlines_set = True
|
|
||||||
print "Use maxlines : %d" % self._filter.getMaxLines()
|
|
||||||
|
|
||||||
def setJournalMatch(self, v):
|
|
||||||
if self._journalmatch is None:
|
|
||||||
self._journalmatch = v
|
|
||||||
|
|
||||||
def readRegex(self, value, regextype):
|
|
||||||
assert(regextype in ('fail', 'ignore'))
|
|
||||||
regex = regextype + 'regex'
|
|
||||||
if os.path.isfile(value) or os.path.isfile(value + '.conf'):
|
|
||||||
if os.path.basename(os.path.dirname(value)) == 'filter.d':
|
|
||||||
## within filter.d folder - use standard loading algorithm to load filter completely (with .local etc.):
|
|
||||||
basedir = os.path.dirname(os.path.dirname(value))
|
|
||||||
value = os.path.splitext(os.path.basename(value))[0]
|
|
||||||
print "Use %11s filter file : %s, basedir: %s" % (regex, value, basedir)
|
|
||||||
reader = FilterReader(value, 'fail2ban-regex-jail', {}, share_config=self.share_config, basedir=basedir)
|
|
||||||
if not reader.read():
|
|
||||||
print "ERROR: failed to load filter %s" % value
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
## foreign file - readexplicit this file and includes if possible:
|
|
||||||
print "Use %11s file : %s" % (regex, value)
|
|
||||||
reader = FilterReader(value, 'fail2ban-regex-jail', {}, share_config=self.share_config)
|
|
||||||
reader.setBaseDir(None)
|
|
||||||
if not reader.readexplicit():
|
|
||||||
print "ERROR: failed to read %s" % value
|
|
||||||
return False
|
|
||||||
reader.getOptions(None)
|
|
||||||
readercommands = reader.convert()
|
|
||||||
regex_values = [
|
|
||||||
RegexStat(m[3])
|
|
||||||
for m in filter(
|
|
||||||
lambda x: x[0] == 'set' and x[2] == "add%sregex" % regextype,
|
|
||||||
readercommands)]
|
|
||||||
# Read out and set possible value of maxlines
|
|
||||||
for command in readercommands:
|
|
||||||
if command[2] == "maxlines":
|
|
||||||
maxlines = int(command[3])
|
|
||||||
try:
|
|
||||||
self.setMaxLines(maxlines)
|
|
||||||
except ValueError:
|
|
||||||
print "ERROR: Invalid value for maxlines (%(maxlines)r) " \
|
|
||||||
"read from %(value)s" % locals()
|
|
||||||
return False
|
|
||||||
elif command[2] == 'addjournalmatch':
|
|
||||||
journalmatch = command[3:]
|
|
||||||
self.setJournalMatch(journalmatch)
|
|
||||||
elif command[2] == 'datepattern':
|
|
||||||
datepattern = command[3]
|
|
||||||
self.setDatePattern(datepattern)
|
|
||||||
else:
|
|
||||||
print "Use %11s line : %s" % (regex, shortstr(value))
|
|
||||||
regex_values = [RegexStat(value)]
|
|
||||||
|
|
||||||
setattr(self, "_" + regex, regex_values)
|
|
||||||
for regex in regex_values:
|
|
||||||
getattr(
|
|
||||||
self._filter,
|
|
||||||
'add%sRegex' % regextype.title())(regex.getFailRegex())
|
|
||||||
return True
|
|
||||||
|
|
||||||
def testIgnoreRegex(self, line):
|
|
||||||
found = False
|
|
||||||
try:
|
|
||||||
ret = self._filter.ignoreLine([(line, "", "")])
|
|
||||||
if ret is not None:
|
|
||||||
found = True
|
|
||||||
regex = self._ignoreregex[ret].inc()
|
|
||||||
except RegexException, e:
|
|
||||||
print e
|
|
||||||
return False
|
|
||||||
return found
|
|
||||||
|
|
||||||
def testRegex(self, line, date=None):
|
|
||||||
orgLineBuffer = self._filter._Filter__lineBuffer
|
|
||||||
fullBuffer = len(orgLineBuffer) >= self._filter.getMaxLines()
|
|
||||||
try:
|
|
||||||
line, ret = self._filter.processLine(line, date, checkAllRegex=True)
|
|
||||||
for match in ret:
|
|
||||||
# Append True/False flag depending if line was matched by
|
|
||||||
# more than one regex
|
|
||||||
match.append(len(ret)>1)
|
|
||||||
regex = self._failregex[match[0]]
|
|
||||||
regex.inc()
|
|
||||||
regex.appendIP(match)
|
|
||||||
except RegexException, e:
|
|
||||||
print e
|
|
||||||
return False
|
|
||||||
except IndexError:
|
|
||||||
print "Sorry, but no <HOST> found in regex"
|
|
||||||
return False
|
|
||||||
for bufLine in orgLineBuffer[int(fullBuffer):]:
|
|
||||||
if bufLine not in self._filter._Filter__lineBuffer:
|
|
||||||
try:
|
|
||||||
self._line_stats.missed_lines.pop(
|
|
||||||
self._line_stats.missed_lines.index("".join(bufLine)))
|
|
||||||
self._line_stats.missed_lines_timeextracted.pop(
|
|
||||||
self._line_stats.missed_lines_timeextracted.index(
|
|
||||||
"".join(bufLine[::2])))
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
self._line_stats.matched += 1
|
|
||||||
self._line_stats.missed -= 1
|
|
||||||
return line, ret
|
|
||||||
|
|
||||||
def process(self, test_lines):
|
|
||||||
t0 = time.time()
|
|
||||||
for line_no, line in enumerate(test_lines):
|
|
||||||
if isinstance(line, tuple):
|
|
||||||
line_datetimestripped, ret = fail2banRegex.testRegex(
|
|
||||||
line[0], line[1])
|
|
||||||
line = "".join(line[0])
|
|
||||||
else:
|
|
||||||
line = line.rstrip('\r\n')
|
|
||||||
if line.startswith('#') or not line:
|
|
||||||
# skip comment and empty lines
|
|
||||||
continue
|
|
||||||
line_datetimestripped, ret = fail2banRegex.testRegex(line)
|
|
||||||
is_ignored = fail2banRegex.testIgnoreRegex(line_datetimestripped)
|
|
||||||
|
|
||||||
if is_ignored:
|
|
||||||
self._line_stats.ignored += 1
|
|
||||||
if not self._print_no_ignored and (self._print_all_ignored or self._line_stats.ignored <= self._maxlines + 1):
|
|
||||||
self._line_stats.ignored_lines.append(line)
|
|
||||||
self._line_stats.ignored_lines_timeextracted.append(line_datetimestripped)
|
|
||||||
|
|
||||||
if len(ret) > 0:
|
|
||||||
assert(not is_ignored)
|
|
||||||
self._line_stats.matched += 1
|
|
||||||
if self._print_all_matched:
|
|
||||||
self._line_stats.matched_lines.append(line)
|
|
||||||
else:
|
|
||||||
if not is_ignored:
|
|
||||||
self._line_stats.missed += 1
|
|
||||||
if not self._print_no_missed and (self._print_all_missed or self._line_stats.missed <= self._maxlines + 1):
|
|
||||||
self._line_stats.missed_lines.append(line)
|
|
||||||
self._line_stats.missed_lines_timeextracted.append(line_datetimestripped)
|
|
||||||
self._line_stats.tested += 1
|
|
||||||
|
|
||||||
if line_no % 10 == 0 and self._filter.dateDetector is not None:
|
|
||||||
self._filter.dateDetector.sortTemplate()
|
|
||||||
self._time_elapsed = time.time() - t0
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def printLines(self, ltype):
|
|
||||||
lstats = self._line_stats
|
|
||||||
assert(self._line_stats.missed == lstats.tested - (lstats.matched + lstats.ignored))
|
|
||||||
lines = lstats[ltype]
|
|
||||||
l = lstats[ltype + '_lines']
|
|
||||||
if lines:
|
|
||||||
header = "%s line(s):" % (ltype.capitalize(),)
|
|
||||||
if self._debuggex:
|
|
||||||
if ltype == 'missed' or ltype == 'matched':
|
|
||||||
regexlist = self._failregex
|
|
||||||
else:
|
|
||||||
regexlist = self._ignoreregex
|
|
||||||
l = lstats[ltype + '_lines_timeextracted']
|
|
||||||
if lines < self._maxlines or getattr(self, '_print_all_' + ltype):
|
|
||||||
ans = [[]]
|
|
||||||
for arg in [l, regexlist]:
|
|
||||||
ans = [ x + [y] for x in ans for y in arg ]
|
|
||||||
b = map(lambda a: a[0] + ' | ' + a[1].getFailRegex() + ' | ' + debuggexURL(a[0], a[1].getFailRegex()), ans)
|
|
||||||
pprint_list([x.rstrip() for x in b], header)
|
|
||||||
else:
|
|
||||||
print "%s too many to print. Use --print-all-%s " \
|
|
||||||
"to print all %d lines" % (header, ltype, lines)
|
|
||||||
elif lines < self._maxlines or getattr(self, '_print_all_' + ltype):
|
|
||||||
pprint_list([x.rstrip() for x in l], header)
|
|
||||||
else:
|
|
||||||
print "%s too many to print. Use --print-all-%s " \
|
|
||||||
"to print all %d lines" % (header, ltype, lines)
|
|
||||||
|
|
||||||
def printStats(self):
|
|
||||||
print
|
|
||||||
print "Results"
|
|
||||||
print "======="
|
|
||||||
|
|
||||||
def print_failregexes(title, failregexes):
|
|
||||||
# Print title
|
|
||||||
total, out = 0, []
|
|
||||||
for cnt, failregex in enumerate(failregexes):
|
|
||||||
match = failregex.getStats()
|
|
||||||
total += match
|
|
||||||
if (match or self._verbose):
|
|
||||||
out.append("%2d) [%d] %s" % (cnt+1, match, failregex.getFailRegex()))
|
|
||||||
|
|
||||||
if self._verbose and len(failregex.getIPList()):
|
|
||||||
for ip in failregex.getIPList():
|
|
||||||
timeTuple = time.localtime(ip[2])
|
|
||||||
timeString = time.strftime("%a %b %d %H:%M:%S %Y", timeTuple)
|
|
||||||
out.append(
|
|
||||||
" %s %s%s" % (
|
|
||||||
ip[1],
|
|
||||||
timeString,
|
|
||||||
ip[-1] and " (multiple regex matched)" or ""))
|
|
||||||
|
|
||||||
print "\n%s: %d total" % (title, total)
|
|
||||||
pprint_list(out, " #) [# of hits] regular expression")
|
|
||||||
return total
|
|
||||||
|
|
||||||
# Print title
|
|
||||||
total = print_failregexes("Failregex", self._failregex)
|
|
||||||
_ = print_failregexes("Ignoreregex", self._ignoreregex)
|
|
||||||
|
|
||||||
|
|
||||||
if self._filter.dateDetector is not None:
|
|
||||||
print "\nDate template hits:"
|
|
||||||
out = []
|
|
||||||
for template in self._filter.dateDetector.templates:
|
|
||||||
if self._verbose or template.hits:
|
|
||||||
out.append("[%d] %s" % (
|
|
||||||
template.hits, template.name))
|
|
||||||
pprint_list(out, "[# of hits] date format")
|
|
||||||
|
|
||||||
print "\nLines: %s" % self._line_stats,
|
|
||||||
if self._time_elapsed is not None:
|
|
||||||
print "[processed in %.2f sec]" % self._time_elapsed,
|
|
||||||
print
|
|
||||||
|
|
||||||
if self._print_all_matched:
|
|
||||||
self.printLines('matched')
|
|
||||||
if not self._print_no_ignored:
|
|
||||||
self.printLines('ignored')
|
|
||||||
if not self._print_no_missed:
|
|
||||||
self.printLines('missed')
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
|
|
||||||
parser = get_opt_parser()
|
|
||||||
(opts, args) = parser.parse_args()
|
|
||||||
if opts.print_no_missed and opts.print_all_missed:
|
|
||||||
sys.stderr.write("ERROR: --print-no-missed and --print-all-missed are mutually exclusive.\n\n")
|
|
||||||
parser.print_help()
|
|
||||||
sys.exit(-1)
|
|
||||||
if opts.print_no_ignored and opts.print_all_ignored:
|
|
||||||
sys.stderr.write("ERROR: --print-no-ignored and --print-all-ignored are mutually exclusive.\n\n")
|
|
||||||
parser.print_help()
|
|
||||||
sys.exit(-1)
|
|
||||||
|
|
||||||
print
|
|
||||||
print "Running tests"
|
|
||||||
print "============="
|
|
||||||
print
|
|
||||||
|
|
||||||
fail2banRegex = Fail2banRegex(opts)
|
|
||||||
|
|
||||||
# We need 2 or 3 parameters
|
|
||||||
if not len(args) in (2, 3):
|
|
||||||
sys.stderr.write("ERROR: provide both <LOG> and <REGEX>.\n\n")
|
|
||||||
parser.print_help()
|
|
||||||
sys.exit(-1)
|
|
||||||
|
|
||||||
# TODO: taken from -testcases -- move common functionality somewhere
|
|
||||||
if opts.log_level is not None: # pragma: no cover
|
|
||||||
# so we had explicit settings
|
|
||||||
logSys.setLevel(getattr(logging, opts.log_level.upper()))
|
|
||||||
else: # pragma: no cover
|
|
||||||
# suppress the logging but it would leave unittests' progress dots
|
|
||||||
# ticking, unless like with '-l critical' which would be silent
|
|
||||||
# unless error occurs
|
|
||||||
logSys.setLevel(getattr(logging, 'CRITICAL'))
|
|
||||||
|
|
||||||
# Add the default logging handler
|
|
||||||
stdout = logging.StreamHandler(sys.stdout)
|
|
||||||
|
|
||||||
fmt = 'D: %(message)s'
|
|
||||||
|
|
||||||
if opts.log_traceback:
|
|
||||||
Formatter = FormatterWithTraceBack
|
|
||||||
fmt = (opts.full_traceback and ' %(tb)s' or ' %(tbc)s') + fmt
|
|
||||||
else:
|
|
||||||
Formatter = logging.Formatter
|
|
||||||
|
|
||||||
# Custom log format for the verbose tests runs
|
|
||||||
if opts.verbose: # pragma: no cover
|
|
||||||
stdout.setFormatter(Formatter(' %(asctime)-15s %(thread)s' + fmt))
|
|
||||||
else: # pragma: no cover
|
|
||||||
# just prefix with the space
|
|
||||||
stdout.setFormatter(Formatter(fmt))
|
|
||||||
logSys.addHandler(stdout)
|
|
||||||
|
|
||||||
cmd_log, cmd_regex = args[:2]
|
|
||||||
|
|
||||||
fail2banRegex.readRegex(cmd_regex, 'fail') or sys.exit(-1)
|
|
||||||
|
|
||||||
if len(args) == 3:
|
|
||||||
fail2banRegex.readRegex(args[2], 'ignore') or sys.exit(-1)
|
|
||||||
|
|
||||||
if os.path.isfile(cmd_log):
|
|
||||||
try:
|
|
||||||
hdlr = open(cmd_log, 'rb')
|
|
||||||
print "Use log file : %s" % cmd_log
|
|
||||||
print "Use encoding : %s" % fail2banRegex.encoding
|
|
||||||
test_lines = file_lines_gen(hdlr)
|
|
||||||
except IOError, e:
|
|
||||||
print e
|
|
||||||
sys.exit(-1)
|
|
||||||
elif cmd_log == "systemd-journal":
|
|
||||||
if not journal:
|
|
||||||
print "Error: systemd library not found. Exiting..."
|
|
||||||
sys.exit(-1)
|
|
||||||
myjournal = journal.Reader(converters={'__CURSOR': lambda x: x})
|
|
||||||
journalmatch = fail2banRegex._journalmatch
|
|
||||||
fail2banRegex.setDatePattern(None)
|
|
||||||
if journalmatch:
|
|
||||||
try:
|
|
||||||
for element in journalmatch:
|
|
||||||
if element == "+":
|
|
||||||
myjournal.add_disjunction()
|
|
||||||
else:
|
|
||||||
myjournal.add_match(element)
|
|
||||||
except ValueError:
|
|
||||||
print "Error: Invalid journalmatch: %s" % shortstr(" ".join(journalmatch))
|
|
||||||
sys.exit(-1)
|
|
||||||
print "Use journal match : %s" % " ".join(journalmatch)
|
|
||||||
test_lines = journal_lines_gen(myjournal)
|
|
||||||
else:
|
|
||||||
print "Use single line : %s" % shortstr(cmd_log)
|
|
||||||
test_lines = [ cmd_log ]
|
|
||||||
print
|
|
||||||
|
|
||||||
fail2banRegex.process(test_lines)
|
|
||||||
|
|
||||||
fail2banRegex.printStats() or sys.exit(-1)
|
|
||||||
|
|
|
@ -0,0 +1,599 @@
|
||||||
|
#!/usr/bin/python
|
||||||
|
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
|
||||||
|
# vi: set ft=python sts=4 ts=4 sw=4 noet :
|
||||||
|
#
|
||||||
|
# This file is part of Fail2Ban.
|
||||||
|
#
|
||||||
|
# Fail2Ban is free software; you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation; either version 2 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# Fail2Ban is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with Fail2Ban; if not, write to the Free Software
|
||||||
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
"""
|
||||||
|
Fail2Ban reads log file that contains password failure report
|
||||||
|
and bans the corresponding IP addresses using firewall rules.
|
||||||
|
|
||||||
|
This tools can test regular expressions for "fail2ban".
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
__author__ = "Fail2Ban Developers"
|
||||||
|
__copyright__ = "Copyright (c) 2004-2008 Cyril Jaquier, 2012-2014 Yaroslav Halchenko"
|
||||||
|
__license__ = "GPL"
|
||||||
|
|
||||||
|
import getopt
|
||||||
|
import locale
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import shlex
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import time
|
||||||
|
import urllib
|
||||||
|
from optparse import OptionParser, Option
|
||||||
|
|
||||||
|
from ConfigParser import NoOptionError, NoSectionError, MissingSectionHeaderError
|
||||||
|
|
||||||
|
try:
|
||||||
|
from systemd import journal
|
||||||
|
from ..server.filtersystemd import FilterSystemd
|
||||||
|
except ImportError:
|
||||||
|
journal = None
|
||||||
|
|
||||||
|
from ..version import version
|
||||||
|
from .filterreader import FilterReader
|
||||||
|
from ..server.filter import Filter, FileContainer
|
||||||
|
from ..server.failregex import RegexException
|
||||||
|
|
||||||
|
from ..helpers import FormatterWithTraceBack, getLogger
|
||||||
|
# Gets the instance of the logger.
|
||||||
|
logSys = getLogger("fail2ban")
|
||||||
|
|
||||||
|
def debuggexURL(sample, regex):
|
||||||
|
q = urllib.urlencode({ 're': regex.replace('<HOST>', '(?&.ipv4)'),
|
||||||
|
'str': sample,
|
||||||
|
'flavor': 'python' })
|
||||||
|
return 'http://www.debuggex.com/?' + q
|
||||||
|
|
||||||
|
def output(args):
|
||||||
|
print(args)
|
||||||
|
|
||||||
|
def shortstr(s, l=53):
|
||||||
|
"""Return shortened string
|
||||||
|
"""
|
||||||
|
if len(s) > l:
|
||||||
|
return s[:l-3] + '...'
|
||||||
|
return s
|
||||||
|
|
||||||
|
def pprint_list(l, header=None):
|
||||||
|
if not len(l):
|
||||||
|
return
|
||||||
|
if header:
|
||||||
|
s = "|- %s\n" % header
|
||||||
|
else:
|
||||||
|
s = ''
|
||||||
|
output( s + "| " + "\n| ".join(l) + '\n`-' )
|
||||||
|
|
||||||
|
def journal_lines_gen(myjournal):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
entry = myjournal.get_next()
|
||||||
|
except OSError:
|
||||||
|
continue
|
||||||
|
if not entry:
|
||||||
|
break
|
||||||
|
yield FilterSystemd.formatJournalEntry(entry)
|
||||||
|
|
||||||
|
def get_opt_parser():
|
||||||
|
# use module docstring for help output
|
||||||
|
p = OptionParser(
|
||||||
|
usage="%s [OPTIONS] <LOG> <REGEX> [IGNOREREGEX]\n" % sys.argv[0] + __doc__
|
||||||
|
+ """
|
||||||
|
LOG:
|
||||||
|
string a string representing a log line
|
||||||
|
filename path to a log file (/var/log/auth.log)
|
||||||
|
"systemd-journal" search systemd journal (systemd-python required)
|
||||||
|
|
||||||
|
REGEX:
|
||||||
|
string a string representing a 'failregex'
|
||||||
|
filename path to a filter file (filter.d/sshd.conf)
|
||||||
|
|
||||||
|
IGNOREREGEX:
|
||||||
|
string a string representing an 'ignoreregex'
|
||||||
|
filename path to a filter file (filter.d/sshd.conf)
|
||||||
|
|
||||||
|
Copyright (c) 2004-2008 Cyril Jaquier, 2008- Fail2Ban Contributors
|
||||||
|
Copyright of modifications held by their respective authors.
|
||||||
|
Licensed under the GNU General Public License v2 (GPL).
|
||||||
|
|
||||||
|
Written by Cyril Jaquier <cyril.jaquier@fail2ban.org>.
|
||||||
|
Many contributions by Yaroslav O. Halchenko and Steven Hiscocks.
|
||||||
|
|
||||||
|
Report bugs to https://github.com/fail2ban/fail2ban/issues
|
||||||
|
""",
|
||||||
|
version="%prog " + version)
|
||||||
|
|
||||||
|
p.add_options([
|
||||||
|
Option("-d", "--datepattern",
|
||||||
|
help="set custom pattern used to match date/times"),
|
||||||
|
Option("-e", "--encoding",
|
||||||
|
help="File encoding. Default: system locale"),
|
||||||
|
Option("-L", "--maxlines", type=int, default=0,
|
||||||
|
help="maxlines for multi-line regex"),
|
||||||
|
Option("-m", "--journalmatch",
|
||||||
|
help="journalctl style matches overriding filter file. "
|
||||||
|
"\"systemd-journal\" only"),
|
||||||
|
Option('-l', "--log-level", type="choice",
|
||||||
|
dest="log_level",
|
||||||
|
choices=('heavydebug', 'debug', 'info', 'notice', 'warning', 'error', 'critical'),
|
||||||
|
default=None,
|
||||||
|
help="Log level for the Fail2Ban logger to use"),
|
||||||
|
Option("-v", "--verbose", action='store_true',
|
||||||
|
help="Be verbose in output"),
|
||||||
|
Option("-D", "--debuggex", action='store_true',
|
||||||
|
help="Produce debuggex.com urls for debugging there"),
|
||||||
|
Option("--print-no-missed", action='store_true',
|
||||||
|
help="Do not print any missed lines"),
|
||||||
|
Option("--print-no-ignored", action='store_true',
|
||||||
|
help="Do not print any ignored lines"),
|
||||||
|
Option("--print-all-matched", action='store_true',
|
||||||
|
help="Print all matched lines"),
|
||||||
|
Option("--print-all-missed", action='store_true',
|
||||||
|
help="Print all missed lines, no matter how many"),
|
||||||
|
Option("--print-all-ignored", action='store_true',
|
||||||
|
help="Print all ignored lines, no matter how many"),
|
||||||
|
Option("-t", "--log-traceback", action='store_true',
|
||||||
|
help="Enrich log-messages with compressed tracebacks"),
|
||||||
|
Option("--full-traceback", action='store_true',
|
||||||
|
help="Either to make the tracebacks full, not compressed (as by default)"),
|
||||||
|
])
|
||||||
|
|
||||||
|
return p
|
||||||
|
|
||||||
|
|
||||||
|
class RegexStat(object):
|
||||||
|
|
||||||
|
def __init__(self, failregex):
|
||||||
|
self._stats = 0
|
||||||
|
self._failregex = failregex
|
||||||
|
self._ipList = list()
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "%s(%r) %d failed: %s" \
|
||||||
|
% (self.__class__, self._failregex, self._stats, self._ipList)
|
||||||
|
|
||||||
|
def inc(self):
|
||||||
|
self._stats += 1
|
||||||
|
|
||||||
|
def getStats(self):
|
||||||
|
return self._stats
|
||||||
|
|
||||||
|
def getFailRegex(self):
|
||||||
|
return self._failregex
|
||||||
|
|
||||||
|
def appendIP(self, value):
|
||||||
|
self._ipList.append(value)
|
||||||
|
|
||||||
|
def getIPList(self):
|
||||||
|
return self._ipList
|
||||||
|
|
||||||
|
|
||||||
|
class LineStats(object):
|
||||||
|
"""Just a convenience container for stats
|
||||||
|
"""
|
||||||
|
def __init__(self):
|
||||||
|
self.tested = self.matched = 0
|
||||||
|
self.matched_lines = []
|
||||||
|
self.missed = 0
|
||||||
|
self.missed_lines = []
|
||||||
|
self.missed_lines_timeextracted = []
|
||||||
|
self.ignored = 0
|
||||||
|
self.ignored_lines = []
|
||||||
|
self.ignored_lines_timeextracted = []
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "%(tested)d lines, %(ignored)d ignored, %(matched)d matched, %(missed)d missed" % self
|
||||||
|
|
||||||
|
# just for convenient str
|
||||||
|
def __getitem__(self, key):
|
||||||
|
return getattr(self, key) if hasattr(self, key) else ''
|
||||||
|
|
||||||
|
|
||||||
|
class Fail2banRegex(object):
|
||||||
|
|
||||||
|
def __init__(self, opts):
|
||||||
|
self._verbose = opts.verbose
|
||||||
|
self._debuggex = opts.debuggex
|
||||||
|
self._maxlines = 20
|
||||||
|
self._print_no_missed = opts.print_no_missed
|
||||||
|
self._print_no_ignored = opts.print_no_ignored
|
||||||
|
self._print_all_matched = opts.print_all_matched
|
||||||
|
self._print_all_missed = opts.print_all_missed
|
||||||
|
self._print_all_ignored = opts.print_all_ignored
|
||||||
|
self._maxlines_set = False # so we allow to override maxlines in cmdline
|
||||||
|
self._datepattern_set = False
|
||||||
|
self._journalmatch = None
|
||||||
|
|
||||||
|
self.share_config=dict()
|
||||||
|
self._filter = Filter(None)
|
||||||
|
self._ignoreregex = list()
|
||||||
|
self._failregex = list()
|
||||||
|
self._time_elapsed = None
|
||||||
|
self._line_stats = LineStats()
|
||||||
|
|
||||||
|
if opts.maxlines:
|
||||||
|
self.setMaxLines(opts.maxlines)
|
||||||
|
if opts.journalmatch is not None:
|
||||||
|
self.setJournalMatch(opts.journalmatch.split())
|
||||||
|
if opts.datepattern:
|
||||||
|
self.setDatePattern(opts.datepattern)
|
||||||
|
if opts.encoding:
|
||||||
|
self.encoding = opts.encoding
|
||||||
|
else:
|
||||||
|
self.encoding = locale.getpreferredencoding()
|
||||||
|
|
||||||
|
def decode_line(self, line):
|
||||||
|
return FileContainer.decode_line('<LOG>', self.encoding, line)
|
||||||
|
|
||||||
|
def encode_line(self, line):
|
||||||
|
return line.encode(self.encoding, 'ignore')
|
||||||
|
|
||||||
|
def setDatePattern(self, pattern):
|
||||||
|
if not self._datepattern_set:
|
||||||
|
self._filter.setDatePattern(pattern)
|
||||||
|
self._datepattern_set = True
|
||||||
|
if pattern is not None:
|
||||||
|
output( "Use datepattern : %s" % (
|
||||||
|
self._filter.getDatePattern()[1], ) )
|
||||||
|
|
||||||
|
def setMaxLines(self, v):
|
||||||
|
if not self._maxlines_set:
|
||||||
|
self._filter.setMaxLines(int(v))
|
||||||
|
self._maxlines_set = True
|
||||||
|
output( "Use maxlines : %d" % self._filter.getMaxLines() )
|
||||||
|
|
||||||
|
def setJournalMatch(self, v):
|
||||||
|
if self._journalmatch is None:
|
||||||
|
self._journalmatch = v
|
||||||
|
|
||||||
|
def readRegex(self, value, regextype):
|
||||||
|
assert(regextype in ('fail', 'ignore'))
|
||||||
|
regex = regextype + 'regex'
|
||||||
|
if os.path.isfile(value) or os.path.isfile(value + '.conf'):
|
||||||
|
if os.path.basename(os.path.dirname(value)) == 'filter.d':
|
||||||
|
## within filter.d folder - use standard loading algorithm to load filter completely (with .local etc.):
|
||||||
|
basedir = os.path.dirname(os.path.dirname(value))
|
||||||
|
value = os.path.splitext(os.path.basename(value))[0]
|
||||||
|
output( "Use %11s filter file : %s, basedir: %s" % (regex, value, basedir) )
|
||||||
|
reader = FilterReader(value, 'fail2ban-regex-jail', {}, share_config=self.share_config, basedir=basedir)
|
||||||
|
if not reader.read():
|
||||||
|
output( "ERROR: failed to load filter %s" % value )
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
## foreign file - readexplicit this file and includes if possible:
|
||||||
|
output( "Use %11s file : %s" % (regex, value) )
|
||||||
|
reader = FilterReader(value, 'fail2ban-regex-jail', {}, share_config=self.share_config)
|
||||||
|
reader.setBaseDir(None)
|
||||||
|
if not reader.readexplicit():
|
||||||
|
output( "ERROR: failed to read %s" % value )
|
||||||
|
return False
|
||||||
|
reader.getOptions(None)
|
||||||
|
readercommands = reader.convert()
|
||||||
|
regex_values = [
|
||||||
|
RegexStat(m[3])
|
||||||
|
for m in filter(
|
||||||
|
lambda x: x[0] == 'set' and x[2] == "add%sregex" % regextype,
|
||||||
|
readercommands)]
|
||||||
|
# Read out and set possible value of maxlines
|
||||||
|
for command in readercommands:
|
||||||
|
if command[2] == "maxlines":
|
||||||
|
maxlines = int(command[3])
|
||||||
|
try:
|
||||||
|
self.setMaxLines(maxlines)
|
||||||
|
except ValueError:
|
||||||
|
output( "ERROR: Invalid value for maxlines (%(maxlines)r) " \
|
||||||
|
"read from %(value)s" % locals() )
|
||||||
|
return False
|
||||||
|
elif command[2] == 'addjournalmatch':
|
||||||
|
journalmatch = command[3:]
|
||||||
|
self.setJournalMatch(journalmatch)
|
||||||
|
elif command[2] == 'datepattern':
|
||||||
|
datepattern = command[3]
|
||||||
|
self.setDatePattern(datepattern)
|
||||||
|
else:
|
||||||
|
output( "Use %11s line : %s" % (regex, shortstr(value)) )
|
||||||
|
regex_values = [RegexStat(value)]
|
||||||
|
|
||||||
|
setattr(self, "_" + regex, regex_values)
|
||||||
|
for regex in regex_values:
|
||||||
|
getattr(
|
||||||
|
self._filter,
|
||||||
|
'add%sRegex' % regextype.title())(regex.getFailRegex())
|
||||||
|
return True
|
||||||
|
|
||||||
|
def testIgnoreRegex(self, line):
|
||||||
|
found = False
|
||||||
|
try:
|
||||||
|
ret = self._filter.ignoreLine([(line, "", "")])
|
||||||
|
if ret is not None:
|
||||||
|
found = True
|
||||||
|
regex = self._ignoreregex[ret].inc()
|
||||||
|
except RegexException, e:
|
||||||
|
output( e )
|
||||||
|
return False
|
||||||
|
return found
|
||||||
|
|
||||||
|
def testRegex(self, line, date=None):
|
||||||
|
orgLineBuffer = self._filter._Filter__lineBuffer
|
||||||
|
fullBuffer = len(orgLineBuffer) >= self._filter.getMaxLines()
|
||||||
|
try:
|
||||||
|
line, ret = self._filter.processLine(line, date, checkAllRegex=True)
|
||||||
|
for match in ret:
|
||||||
|
# Append True/False flag depending if line was matched by
|
||||||
|
# more than one regex
|
||||||
|
match.append(len(ret)>1)
|
||||||
|
regex = self._failregex[match[0]]
|
||||||
|
regex.inc()
|
||||||
|
regex.appendIP(match)
|
||||||
|
except RegexException, e:
|
||||||
|
output( e )
|
||||||
|
return False
|
||||||
|
except IndexError:
|
||||||
|
output( "Sorry, but no <HOST> found in regex" )
|
||||||
|
return False
|
||||||
|
for bufLine in orgLineBuffer[int(fullBuffer):]:
|
||||||
|
if bufLine not in self._filter._Filter__lineBuffer:
|
||||||
|
try:
|
||||||
|
self._line_stats.missed_lines.pop(
|
||||||
|
self._line_stats.missed_lines.index("".join(bufLine)))
|
||||||
|
self._line_stats.missed_lines_timeextracted.pop(
|
||||||
|
self._line_stats.missed_lines_timeextracted.index(
|
||||||
|
"".join(bufLine[::2])))
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
self._line_stats.matched += 1
|
||||||
|
self._line_stats.missed -= 1
|
||||||
|
return line, ret
|
||||||
|
|
||||||
|
def process(self, test_lines):
|
||||||
|
t0 = time.time()
|
||||||
|
for line_no, line in enumerate(test_lines):
|
||||||
|
if isinstance(line, tuple):
|
||||||
|
line_datetimestripped, ret = self.testRegex(
|
||||||
|
line[0], line[1])
|
||||||
|
line = "".join(line[0])
|
||||||
|
else:
|
||||||
|
line = line.rstrip('\r\n')
|
||||||
|
if line.startswith('#') or not line:
|
||||||
|
# skip comment and empty lines
|
||||||
|
continue
|
||||||
|
line_datetimestripped, ret = self.testRegex(line)
|
||||||
|
is_ignored = self.testIgnoreRegex(line_datetimestripped)
|
||||||
|
|
||||||
|
if is_ignored:
|
||||||
|
self._line_stats.ignored += 1
|
||||||
|
if not self._print_no_ignored and (self._print_all_ignored or self._line_stats.ignored <= self._maxlines + 1):
|
||||||
|
self._line_stats.ignored_lines.append(line)
|
||||||
|
self._line_stats.ignored_lines_timeextracted.append(line_datetimestripped)
|
||||||
|
|
||||||
|
if len(ret) > 0:
|
||||||
|
assert(not is_ignored)
|
||||||
|
self._line_stats.matched += 1
|
||||||
|
if self._print_all_matched:
|
||||||
|
self._line_stats.matched_lines.append(line)
|
||||||
|
else:
|
||||||
|
if not is_ignored:
|
||||||
|
self._line_stats.missed += 1
|
||||||
|
if not self._print_no_missed and (self._print_all_missed or self._line_stats.missed <= self._maxlines + 1):
|
||||||
|
self._line_stats.missed_lines.append(line)
|
||||||
|
self._line_stats.missed_lines_timeextracted.append(line_datetimestripped)
|
||||||
|
self._line_stats.tested += 1
|
||||||
|
|
||||||
|
if line_no % 10 == 0 and self._filter.dateDetector is not None:
|
||||||
|
self._filter.dateDetector.sortTemplate()
|
||||||
|
self._time_elapsed = time.time() - t0
|
||||||
|
|
||||||
|
def printLines(self, ltype):
|
||||||
|
lstats = self._line_stats
|
||||||
|
assert(self._line_stats.missed == lstats.tested - (lstats.matched + lstats.ignored))
|
||||||
|
lines = lstats[ltype]
|
||||||
|
l = lstats[ltype + '_lines']
|
||||||
|
if lines:
|
||||||
|
header = "%s line(s):" % (ltype.capitalize(),)
|
||||||
|
if self._debuggex:
|
||||||
|
if ltype == 'missed' or ltype == 'matched':
|
||||||
|
regexlist = self._failregex
|
||||||
|
else:
|
||||||
|
regexlist = self._ignoreregex
|
||||||
|
l = lstats[ltype + '_lines_timeextracted']
|
||||||
|
if lines < self._maxlines or getattr(self, '_print_all_' + ltype):
|
||||||
|
ans = [[]]
|
||||||
|
for arg in [l, regexlist]:
|
||||||
|
ans = [ x + [y] for x in ans for y in arg ]
|
||||||
|
b = map(lambda a: a[0] + ' | ' + a[1].getFailRegex() + ' | ' +
|
||||||
|
debuggexURL(self.encode_line(a[0]), a[1].getFailRegex()), ans)
|
||||||
|
pprint_list([x.rstrip() for x in b], header)
|
||||||
|
else:
|
||||||
|
output( "%s too many to print. Use --print-all-%s " \
|
||||||
|
"to print all %d lines" % (header, ltype, lines) )
|
||||||
|
elif lines < self._maxlines or getattr(self, '_print_all_' + ltype):
|
||||||
|
pprint_list([x.rstrip() for x in l], header)
|
||||||
|
else:
|
||||||
|
output( "%s too many to print. Use --print-all-%s " \
|
||||||
|
"to print all %d lines" % (header, ltype, lines) )
|
||||||
|
|
||||||
|
def printStats(self):
|
||||||
|
output( "" )
|
||||||
|
output( "Results" )
|
||||||
|
output( "=======" )
|
||||||
|
|
||||||
|
def print_failregexes(title, failregexes):
|
||||||
|
# Print title
|
||||||
|
total, out = 0, []
|
||||||
|
for cnt, failregex in enumerate(failregexes):
|
||||||
|
match = failregex.getStats()
|
||||||
|
total += match
|
||||||
|
if (match or self._verbose):
|
||||||
|
out.append("%2d) [%d] %s" % (cnt+1, match, failregex.getFailRegex()))
|
||||||
|
|
||||||
|
if self._verbose and len(failregex.getIPList()):
|
||||||
|
for ip in failregex.getIPList():
|
||||||
|
timeTuple = time.localtime(ip[2])
|
||||||
|
timeString = time.strftime("%a %b %d %H:%M:%S %Y", timeTuple)
|
||||||
|
out.append(
|
||||||
|
" %s %s%s" % (
|
||||||
|
ip[1],
|
||||||
|
timeString,
|
||||||
|
ip[-1] and " (multiple regex matched)" or ""))
|
||||||
|
|
||||||
|
output( "\n%s: %d total" % (title, total) )
|
||||||
|
pprint_list(out, " #) [# of hits] regular expression")
|
||||||
|
return total
|
||||||
|
|
||||||
|
# Print title
|
||||||
|
total = print_failregexes("Failregex", self._failregex)
|
||||||
|
_ = print_failregexes("Ignoreregex", self._ignoreregex)
|
||||||
|
|
||||||
|
|
||||||
|
if self._filter.dateDetector is not None:
|
||||||
|
output( "\nDate template hits:" )
|
||||||
|
out = []
|
||||||
|
for template in self._filter.dateDetector.templates:
|
||||||
|
if self._verbose or template.hits:
|
||||||
|
out.append("[%d] %s" % (
|
||||||
|
template.hits, template.name))
|
||||||
|
pprint_list(out, "[# of hits] date format")
|
||||||
|
|
||||||
|
output( "\nLines: %s" % self._line_stats, )
|
||||||
|
if self._time_elapsed is not None:
|
||||||
|
output( "[processed in %.2f sec]" % self._time_elapsed, )
|
||||||
|
output( "" )
|
||||||
|
|
||||||
|
if self._print_all_matched:
|
||||||
|
self.printLines('matched')
|
||||||
|
if not self._print_no_ignored:
|
||||||
|
self.printLines('ignored')
|
||||||
|
if not self._print_no_missed:
|
||||||
|
self.printLines('missed')
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def file_lines_gen(self, hdlr):
|
||||||
|
for line in hdlr:
|
||||||
|
yield self.decode_line(line)
|
||||||
|
|
||||||
|
def start(self, opts, args):
|
||||||
|
|
||||||
|
cmd_log, cmd_regex = args[:2]
|
||||||
|
|
||||||
|
if not self.readRegex(cmd_regex, 'fail'):
|
||||||
|
return False
|
||||||
|
|
||||||
|
if len(args) == 3 and not self.readRegex(args[2], 'ignore'):
|
||||||
|
return False
|
||||||
|
|
||||||
|
if os.path.isfile(cmd_log):
|
||||||
|
try:
|
||||||
|
hdlr = open(cmd_log, 'rb')
|
||||||
|
output( "Use log file : %s" % cmd_log )
|
||||||
|
output( "Use encoding : %s" % self.encoding )
|
||||||
|
test_lines = self.file_lines_gen(hdlr)
|
||||||
|
except IOError, e:
|
||||||
|
output( e )
|
||||||
|
return False
|
||||||
|
elif cmd_log == "systemd-journal": # pragma: no cover
|
||||||
|
if not journal:
|
||||||
|
output( "Error: systemd library not found. Exiting..." )
|
||||||
|
return False
|
||||||
|
myjournal = journal.Reader(converters={'__CURSOR': lambda x: x})
|
||||||
|
journalmatch = self._journalmatch
|
||||||
|
self.setDatePattern(None)
|
||||||
|
if journalmatch:
|
||||||
|
try:
|
||||||
|
for element in journalmatch:
|
||||||
|
if element == "+":
|
||||||
|
myjournal.add_disjunction()
|
||||||
|
else:
|
||||||
|
myjournal.add_match(element)
|
||||||
|
except ValueError:
|
||||||
|
output( "Error: Invalid journalmatch: %s" % shortstr(" ".join(journalmatch)) )
|
||||||
|
return False
|
||||||
|
output( "Use journal match : %s" % " ".join(journalmatch) )
|
||||||
|
test_lines = journal_lines_gen(myjournal)
|
||||||
|
else:
|
||||||
|
output( "Use single line : %s" % shortstr(cmd_log) )
|
||||||
|
test_lines = [ cmd_log ]
|
||||||
|
output( "" )
|
||||||
|
|
||||||
|
self.process(test_lines)
|
||||||
|
|
||||||
|
if not self.printStats():
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def exec_command_line(): # pragma: no cover
|
||||||
|
parser = get_opt_parser()
|
||||||
|
(opts, args) = parser.parse_args()
|
||||||
|
if opts.print_no_missed and opts.print_all_missed:
|
||||||
|
sys.stderr.write("ERROR: --print-no-missed and --print-all-missed are mutually exclusive.\n\n")
|
||||||
|
parser.print_help()
|
||||||
|
sys.exit(-1)
|
||||||
|
if opts.print_no_ignored and opts.print_all_ignored:
|
||||||
|
sys.stderr.write("ERROR: --print-no-ignored and --print-all-ignored are mutually exclusive.\n\n")
|
||||||
|
parser.print_help()
|
||||||
|
sys.exit(-1)
|
||||||
|
|
||||||
|
# We need 2 or 3 parameters
|
||||||
|
if not len(args) in (2, 3):
|
||||||
|
sys.stderr.write("ERROR: provide both <LOG> and <REGEX>.\n\n")
|
||||||
|
parser.print_help()
|
||||||
|
return False
|
||||||
|
|
||||||
|
output( "" )
|
||||||
|
output( "Running tests" )
|
||||||
|
output( "=============" )
|
||||||
|
output( "" )
|
||||||
|
|
||||||
|
# TODO: taken from -testcases -- move common functionality somewhere
|
||||||
|
if opts.log_level is not None: # pragma: no cover
|
||||||
|
# so we had explicit settings
|
||||||
|
logSys.setLevel(getattr(logging, opts.log_level.upper()))
|
||||||
|
else: # pragma: no cover
|
||||||
|
# suppress the logging but it would leave unittests' progress dots
|
||||||
|
# ticking, unless like with '-l critical' which would be silent
|
||||||
|
# unless error occurs
|
||||||
|
logSys.setLevel(getattr(logging, 'CRITICAL'))
|
||||||
|
|
||||||
|
# Add the default logging handler
|
||||||
|
stdout = logging.StreamHandler(sys.stdout)
|
||||||
|
|
||||||
|
fmt = 'D: %(message)s'
|
||||||
|
|
||||||
|
if opts.log_traceback:
|
||||||
|
Formatter = FormatterWithTraceBack
|
||||||
|
fmt = (opts.full_traceback and ' %(tb)s' or ' %(tbc)s') + fmt
|
||||||
|
else:
|
||||||
|
Formatter = logging.Formatter
|
||||||
|
|
||||||
|
# Custom log format for the verbose tests runs
|
||||||
|
if opts.verbose: # pragma: no cover
|
||||||
|
stdout.setFormatter(Formatter(' %(asctime)-15s %(thread)s' + fmt))
|
||||||
|
else: # pragma: no cover
|
||||||
|
# just prefix with the space
|
||||||
|
stdout.setFormatter(Formatter(fmt))
|
||||||
|
logSys.addHandler(stdout)
|
||||||
|
|
||||||
|
fail2banRegex = Fail2banRegex(opts)
|
||||||
|
if not fail2banRegex.start(opts, args):
|
||||||
|
sys.exit(-1)
|
|
@ -792,23 +792,27 @@ class FileContainer:
|
||||||
self.__handler.seek(self.__pos)
|
self.__handler.seek(self.__pos)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def readline(self):
|
@staticmethod
|
||||||
if self.__handler is None:
|
def decode_line(filename, enc, line):
|
||||||
return ""
|
|
||||||
line = self.__handler.readline()
|
|
||||||
try:
|
try:
|
||||||
line = line.decode(self.getEncoding(), 'strict')
|
line = line.decode(enc, 'strict')
|
||||||
except UnicodeDecodeError:
|
except UnicodeDecodeError:
|
||||||
logSys.warning(
|
logSys.warning(
|
||||||
"Error decoding line from '%s' with '%s'."
|
"Error decoding line from '%s' with '%s'."
|
||||||
" Consider setting logencoding=utf-8 (or another appropriate"
|
" Consider setting logencoding=utf-8 (or another appropriate"
|
||||||
" encoding) for this jail. Continuing"
|
" encoding) for this jail. Continuing"
|
||||||
" to process line ignoring invalid characters: %r" %
|
" to process line ignoring invalid characters: %r" %
|
||||||
(self.getFileName(), self.getEncoding(), line))
|
(filename, enc, line))
|
||||||
# decode with replacing error chars:
|
# decode with replacing error chars:
|
||||||
line = line.decode(self.getEncoding(), 'replace')
|
line = line.decode(enc, 'replace')
|
||||||
return line
|
return line
|
||||||
|
|
||||||
|
def readline(self):
|
||||||
|
if self.__handler is None:
|
||||||
|
return ""
|
||||||
|
return FileContainer.decode_line(
|
||||||
|
self.getFileName(), self.getEncoding(), self.__handler.readline())
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
if not self.__handler is None:
|
if not self.__handler is None:
|
||||||
# Saves the last position.
|
# Saves the last position.
|
||||||
|
|
|
@ -0,0 +1,181 @@
|
||||||
|
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
|
||||||
|
# vi: set ft=python sts=4 ts=4 sw=4 noet :
|
||||||
|
|
||||||
|
# This file is part of Fail2Ban.
|
||||||
|
#
|
||||||
|
# Fail2Ban is free software; you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation; either version 2 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# Fail2Ban is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with Fail2Ban; if not, write to the Free Software
|
||||||
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
|
||||||
|
# Fail2Ban developers
|
||||||
|
|
||||||
|
__author__ = "Serg Brester"
|
||||||
|
__copyright__ = "Copyright (c) 2015 Serg G. Brester (sebres), 2008- Fail2Ban Contributors"
|
||||||
|
__license__ = "GPL"
|
||||||
|
|
||||||
|
from __builtin__ import open as fopen
|
||||||
|
import unittest
|
||||||
|
import getpass
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import tempfile
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
try:
|
||||||
|
from systemd import journal
|
||||||
|
except ImportError:
|
||||||
|
journal = None
|
||||||
|
|
||||||
|
from ..client import fail2banregex
|
||||||
|
from ..client.fail2banregex import Fail2banRegex, get_opt_parser, output
|
||||||
|
from .utils import LogCaptureTestCase, logSys
|
||||||
|
|
||||||
|
|
||||||
|
fail2banregex.logSys = logSys
|
||||||
|
def _test_output(*args):
|
||||||
|
logSys.info(args[0])
|
||||||
|
|
||||||
|
fail2banregex.output = _test_output
|
||||||
|
|
||||||
|
CONF_FILES_DIR = os.path.abspath(
|
||||||
|
os.path.join(os.path.dirname(__file__),"..", "..", "config"))
|
||||||
|
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
|
||||||
|
|
||||||
|
|
||||||
|
def _Fail2banRegex(*args):
|
||||||
|
parser = get_opt_parser()
|
||||||
|
(opts, args) = parser.parse_args(list(args))
|
||||||
|
return (opts, args, Fail2banRegex(opts))
|
||||||
|
|
||||||
|
class Fail2banRegexTest(LogCaptureTestCase):
|
||||||
|
|
||||||
|
RE_00 = r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>"
|
||||||
|
|
||||||
|
FILENAME_01 = os.path.join(TEST_FILES_DIR, "testcase01.log")
|
||||||
|
FILENAME_02 = os.path.join(TEST_FILES_DIR, "testcase02.log")
|
||||||
|
FILENAME_WRONGCHAR = os.path.join(TEST_FILES_DIR, "testcase-wrong-char.log")
|
||||||
|
|
||||||
|
FILTER_SSHD = os.path.join(CONF_FILES_DIR, 'filter.d', 'sshd.conf')
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
"""Call before every test case."""
|
||||||
|
LogCaptureTestCase.setUp(self)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
"""Call after every test case."""
|
||||||
|
LogCaptureTestCase.tearDown(self)
|
||||||
|
|
||||||
|
def testWrongRE(self):
|
||||||
|
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||||
|
"test", r".** from <HOST>$"
|
||||||
|
)
|
||||||
|
self.assertRaises(Exception, lambda: fail2banRegex.start(opts, args))
|
||||||
|
self.assertLogged("Unable to compile regular expression")
|
||||||
|
|
||||||
|
def testWrongIngnoreRE(self):
|
||||||
|
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||||
|
"test", r".*? from <HOST>$", r".**"
|
||||||
|
)
|
||||||
|
self.assertRaises(Exception, lambda: fail2banRegex.start(opts, args))
|
||||||
|
self.assertLogged("Unable to compile regular expression")
|
||||||
|
|
||||||
|
def testDirectFound(self):
|
||||||
|
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||||
|
"--print-all-matched", "--print-no-missed",
|
||||||
|
"Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
|
||||||
|
r"Authentication failure for .*? from <HOST>$"
|
||||||
|
)
|
||||||
|
self.assertTrue(fail2banRegex.start(opts, args))
|
||||||
|
self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed')
|
||||||
|
|
||||||
|
def testDirectNotFound(self):
|
||||||
|
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||||
|
"--print-all-missed",
|
||||||
|
"Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
|
||||||
|
r"XYZ from <HOST>$"
|
||||||
|
)
|
||||||
|
self.assertTrue(fail2banRegex.start(opts, args))
|
||||||
|
self.assertLogged('Lines: 1 lines, 0 ignored, 0 matched, 1 missed')
|
||||||
|
|
||||||
|
def testDirectIgnored(self):
|
||||||
|
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||||
|
"--print-all-ignored",
|
||||||
|
"Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
|
||||||
|
r"Authentication failure for .*? from <HOST>$",
|
||||||
|
r"kevin from 192.0.2.0$"
|
||||||
|
)
|
||||||
|
self.assertTrue(fail2banRegex.start(opts, args))
|
||||||
|
self.assertLogged('Lines: 1 lines, 1 ignored, 0 matched, 0 missed')
|
||||||
|
|
||||||
|
def testDirectRE_1(self):
|
||||||
|
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||||
|
"--print-all-matched",
|
||||||
|
Fail2banRegexTest.FILENAME_01,
|
||||||
|
Fail2banRegexTest.RE_00
|
||||||
|
)
|
||||||
|
self.assertTrue(fail2banRegex.start(opts, args))
|
||||||
|
self.assertLogged('Lines: 19 lines, 0 ignored, 13 matched, 6 missed')
|
||||||
|
|
||||||
|
self.assertLogged('Error decoding line');
|
||||||
|
self.assertLogged('Continuing to process line ignoring invalid characters')
|
||||||
|
|
||||||
|
self.assertLogged('Dez 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128')
|
||||||
|
self.assertLogged('Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 87.142.124.10')
|
||||||
|
|
||||||
|
def testDirectRE_2(self):
|
||||||
|
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||||
|
"--print-all-matched",
|
||||||
|
Fail2banRegexTest.FILENAME_02,
|
||||||
|
Fail2banRegexTest.RE_00
|
||||||
|
)
|
||||||
|
self.assertTrue(fail2banRegex.start(opts, args))
|
||||||
|
self.assertLogged('Lines: 13 lines, 0 ignored, 5 matched, 8 missed')
|
||||||
|
|
||||||
|
def testVerbose(self):
|
||||||
|
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||||
|
"--verbose", "--print-no-missed",
|
||||||
|
Fail2banRegexTest.FILENAME_02,
|
||||||
|
Fail2banRegexTest.RE_00
|
||||||
|
)
|
||||||
|
self.assertTrue(fail2banRegex.start(opts, args))
|
||||||
|
self.assertLogged('Lines: 13 lines, 0 ignored, 5 matched, 8 missed')
|
||||||
|
|
||||||
|
self.assertLogged('141.3.81.106 Fri Aug 14 11:53:59 2015')
|
||||||
|
self.assertLogged('141.3.81.106 Fri Aug 14 11:54:59 2015')
|
||||||
|
|
||||||
|
def testWronChar(self):
|
||||||
|
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||||
|
Fail2banRegexTest.FILENAME_WRONGCHAR, Fail2banRegexTest.FILTER_SSHD
|
||||||
|
)
|
||||||
|
self.assertTrue(fail2banRegex.start(opts, args))
|
||||||
|
self.assertLogged('Lines: 4 lines, 0 ignored, 2 matched, 2 missed')
|
||||||
|
|
||||||
|
self.assertLogged('Error decoding line');
|
||||||
|
self.assertLogged('Continuing to process line ignoring invalid characters:', '2015-01-14 20:00:58 user ');
|
||||||
|
self.assertLogged('Continuing to process line ignoring invalid characters:', '2015-01-14 20:00:59 user ');
|
||||||
|
|
||||||
|
self.assertLogged('Nov 8 00:16:12 main sshd[32548]: input_userauth_request: invalid user llinco')
|
||||||
|
self.assertLogged('Nov 8 00:16:12 main sshd[32547]: pam_succeed_if(sshd:auth): error retrieving information about user llinco')
|
||||||
|
|
||||||
|
def testWronCharDebuggex(self):
|
||||||
|
(opts, args, fail2banRegex) = _Fail2banRegex(
|
||||||
|
"--debuggex", "--print-all-matched",
|
||||||
|
Fail2banRegexTest.FILENAME_WRONGCHAR, Fail2banRegexTest.FILTER_SSHD
|
||||||
|
)
|
||||||
|
self.assertTrue(fail2banRegex.start(opts, args))
|
||||||
|
self.assertLogged('Lines: 4 lines, 0 ignored, 2 matched, 2 missed')
|
||||||
|
|
||||||
|
self.assertLogged('http://')
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
Nov 8 00:16:12 main sshd[32547]: Invalid user llinco\361ir from 192.0.2.0
|
||||||
|
Nov 8 00:16:12 main sshd[32548]: input_userauth_request: invalid user llinco\361ir
|
||||||
|
Nov 8 00:16:12 main sshd[32547]: pam_succeed_if(sshd:auth): error retrieving information about user llincoñir
|
||||||
|
Nov 8 00:16:14 main sshd[32547]: Failed password for invalid user llinco\361ir from 192.0.2.0 port 57025 ssh2
|
|
@ -85,6 +85,7 @@ def gatherTests(regexps=None, no_network=False):
|
||||||
from . import misctestcase
|
from . import misctestcase
|
||||||
from . import databasetestcase
|
from . import databasetestcase
|
||||||
from . import samplestestcase
|
from . import samplestestcase
|
||||||
|
from . import fail2banregextestcase
|
||||||
|
|
||||||
if not regexps: # pragma: no cover
|
if not regexps: # pragma: no cover
|
||||||
tests = unittest.TestSuite()
|
tests = unittest.TestSuite()
|
||||||
|
@ -152,6 +153,9 @@ def gatherTests(regexps=None, no_network=False):
|
||||||
# Filter Regex tests with sample logs
|
# Filter Regex tests with sample logs
|
||||||
tests.addTest(unittest.makeSuite(samplestestcase.FilterSamplesRegex))
|
tests.addTest(unittest.makeSuite(samplestestcase.FilterSamplesRegex))
|
||||||
|
|
||||||
|
# bin/fail2ban-regex
|
||||||
|
tests.addTest(unittest.makeSuite(fail2banregextestcase.Fail2banRegexTest))
|
||||||
|
|
||||||
#
|
#
|
||||||
# Python action testcases
|
# Python action testcases
|
||||||
#
|
#
|
||||||
|
|
Loading…
Reference in New Issue