mirror of https://github.com/fail2ban/fail2ban
				
				
				
			
		
			
				
	
	
		
			425 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
			
		
		
	
	
			425 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
#!/usr/bin/python
 | 
						|
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
 | 
						|
# vi: set ft=python sts=4 ts=4 sw=4 noet :
 | 
						|
#
 | 
						|
# This file is part of Fail2Ban.
 | 
						|
#
 | 
						|
# Fail2Ban is free software; you can redistribute it and/or modify
 | 
						|
# it under the terms of the GNU General Public License as published by
 | 
						|
# the Free Software Foundation; either version 2 of the License, or
 | 
						|
# (at your option) any later version.
 | 
						|
#
 | 
						|
# Fail2Ban is distributed in the hope that it will be useful,
 | 
						|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
# GNU General Public License for more details.
 | 
						|
#
 | 
						|
# You should have received a copy of the GNU General Public License
 | 
						|
# along with Fail2Ban; if not, write to the Free Software
 | 
						|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 | 
						|
"""
 | 
						|
Fail2Ban  reads log file that contains password failure report
 | 
						|
and bans the corresponding IP addresses using firewall rules.
 | 
						|
 | 
						|
This tools can test regular expressions for "fail2ban".
 | 
						|
 | 
						|
Report bugs to https://github.com/fail2ban/fail2ban/issues
 | 
						|
"""
 | 
						|
 | 
						|
__author__ = "Cyril Jaquier, Yaroslav Halchenko"
 | 
						|
__copyright__ = "Copyright (c) 2004-2008 Cyril Jaquier, 2012-2013 Yaroslav Halchenko"
 | 
						|
__license__ = "GPL"
 | 
						|
 | 
						|
import getopt, sys, time, logging, os, urllib, re
 | 
						|
 | 
						|
# Inserts our own modules path first in the list
 | 
						|
# fix for bug #343821
 | 
						|
try:
 | 
						|
	from common.version import version
 | 
						|
except ImportError, e:
 | 
						|
	sys.path.insert(1, "/usr/share/fail2ban")
 | 
						|
	from common.version import version
 | 
						|
 | 
						|
from optparse import OptionParser, Option
 | 
						|
 | 
						|
from client.configparserinc import SafeConfigParserWithIncludes
 | 
						|
from ConfigParser import NoOptionError, NoSectionError, MissingSectionHeaderError
 | 
						|
from server.filter import Filter
 | 
						|
from server.failregex import RegexException
 | 
						|
 | 
						|
from testcases.utils import FormatterWithTraceBack
 | 
						|
# Gets the instance of the logger.
 | 
						|
logSys = logging.getLogger("fail2ban")
 | 
						|
 | 
						|
def debuggexURL(sample, regex):
 | 
						|
	q = urllib.urlencode({ 're': re.sub('<HOST>', '(?&.ipv4)', regex),
 | 
						|
							'str': sample,
 | 
						|
							'flavor': 'python' })
 | 
						|
	return 'http://www.debuggex.com/?' + q
 | 
						|
 | 
						|
def shortstr(s, l=53):
 | 
						|
	"""Return shortened string
 | 
						|
	"""
 | 
						|
	if len(s) > l:
 | 
						|
		return s[:l-3] + '...'
 | 
						|
	return s
 | 
						|
 | 
						|
def pprint_list(l, header=None):
 | 
						|
	if not len(l):
 | 
						|
		return
 | 
						|
	if header:
 | 
						|
		s = "|- %s\n" % header
 | 
						|
	else:
 | 
						|
		s = ''
 | 
						|
	print s + "|  " + "\n|  ".join(l) + '\n`-'
 | 
						|
 | 
						|
def get_opt_parser():
 | 
						|
	# use module docstring for help output
 | 
						|
	p = OptionParser(
 | 
						|
				usage="%s [OPTIONS] <LOG> <REGEX> [IGNOREREGEX]\n" % sys.argv[0] + __doc__
 | 
						|
				+ """
 | 
						|
LOG:
 | 
						|
    string                  a string representing a log line
 | 
						|
    filename                path to a log file (/var/log/auth.log)
 | 
						|
 | 
						|
REGEX:
 | 
						|
    string                  a string representing a 'failregex'
 | 
						|
    filename                path to a filter file (filter.d/sshd.conf)
 | 
						|
 | 
						|
IGNOREREGEX:
 | 
						|
    string                  a string representing an 'ignoreregex'
 | 
						|
    filename                path to a filter file (filter.d/sshd.conf)
 | 
						|
""",
 | 
						|
				version="%prog " + version)
 | 
						|
 | 
						|
	p.add_options([
 | 
						|
		Option('-l', "--log-level", type="choice",
 | 
						|
			   dest="log_level",
 | 
						|
			   choices=('heavydebug', 'debug', 'info', 'warning', 'error', 'fatal'),
 | 
						|
			   default=None,
 | 
						|
			   help="Log level for the Fail2Ban logger to use"),
 | 
						|
		Option("-v", "--verbose", action='store_true',
 | 
						|
			   help="Be verbose in output"),
 | 
						|
		Option("-d", "--debuggex", action='store_true',
 | 
						|
			   help="Produce debuggex.com urls for debugging there"),
 | 
						|
		Option("--print-all-missed", action='store_true',
 | 
						|
			   help="Either to print all missed lines"),
 | 
						|
		Option("--print-all-ignored", action='store_true',
 | 
						|
			   help="Either to print all ignored lines"),
 | 
						|
		Option("-t", "--log-traceback", action='store_true',
 | 
						|
			   help="Enrich log-messages with compressed tracebacks"),
 | 
						|
		Option("--full-traceback", action='store_true',
 | 
						|
			   help="Either to make the tracebacks full, not compressed (as by default)"),
 | 
						|
 | 
						|
		])
 | 
						|
 | 
						|
	return p
 | 
						|
 | 
						|
 | 
						|
class RegexStat(object):
 | 
						|
 | 
						|
	def __init__(self, failregex):
 | 
						|
		self._stats = 0
 | 
						|
		self._failregex = failregex
 | 
						|
		self._ipList = list()
 | 
						|
 | 
						|
	def __str__(self):
 | 
						|
		return "%s(%r) %d failed: %s" \
 | 
						|
		  % (self.__class__, self._failregex, self._stats, self._ipList)
 | 
						|
 | 
						|
	def inc(self):
 | 
						|
		self._stats += 1
 | 
						|
 | 
						|
	def getStats(self):
 | 
						|
		return self._stats
 | 
						|
 | 
						|
	def getFailRegex(self):
 | 
						|
		return self._failregex
 | 
						|
 | 
						|
	def appendIP(self, value):
 | 
						|
		self._ipList.append(value)
 | 
						|
 | 
						|
	def getIPList(self):
 | 
						|
		return self._ipList
 | 
						|
 | 
						|
 | 
						|
class LineStats(object):
 | 
						|
	"""Just a convenience container for stats
 | 
						|
	"""
 | 
						|
	def __init__(self):
 | 
						|
		self.tested = self.matched = 0
 | 
						|
		self.missed_lines = []
 | 
						|
		self.missed_lines_timeextracted = []
 | 
						|
		self.ignored_lines = []
 | 
						|
		self.ignored_lines_timeextracted = []
 | 
						|
 | 
						|
	def __str__(self):
 | 
						|
		return "%(tested)d lines, %(ignored)d ignored, %(matched)d matched, %(missed)d missed" % self
 | 
						|
 | 
						|
	@property
 | 
						|
	def ignored(self):
 | 
						|
		return len(self.ignored_lines)
 | 
						|
 | 
						|
	@property
 | 
						|
	def missed(self):
 | 
						|
		return self.tested - (self.ignored + self.matched)
 | 
						|
 | 
						|
	# just for convenient str
 | 
						|
	def __getitem__(self, key):
 | 
						|
		return getattr(self, key)
 | 
						|
 | 
						|
 | 
						|
class Fail2banRegex(object):
 | 
						|
 | 
						|
	CONFIG_DEFAULTS = {'configpath' : "/etc/fail2ban/"}
 | 
						|
 | 
						|
	def __init__(self, opts):
 | 
						|
		self._verbose = opts.verbose
 | 
						|
		self._debuggex = opts.debuggex
 | 
						|
		self._print_all_missed = opts.print_all_missed
 | 
						|
		self._print_all_ignored = opts.print_all_ignored
 | 
						|
 | 
						|
		self._filter = Filter(None)
 | 
						|
		self._ignoreregex = list()
 | 
						|
		self._failregex = list()
 | 
						|
		self._line_stats = LineStats()
 | 
						|
 | 
						|
 | 
						|
	def readRegex(self, value, regextype):
 | 
						|
		assert(regextype in ('fail', 'ignore'))
 | 
						|
		regex = regextype + 'regex'
 | 
						|
		if os.path.isfile(value):
 | 
						|
			reader = SafeConfigParserWithIncludes(defaults=self.CONFIG_DEFAULTS)
 | 
						|
			try:
 | 
						|
				reader.read(value)
 | 
						|
				print "Use %11s file : %s" % (regex, value)
 | 
						|
				# TODO: reuse functionality in client
 | 
						|
				regex_values = [
 | 
						|
					RegexStat(m)
 | 
						|
					for m in reader.get("Definition", regex).split('\n')
 | 
						|
					if m != ""]
 | 
						|
			except NoSectionError:
 | 
						|
				print "No [Definition] section in %s" % value
 | 
						|
				return False
 | 
						|
			except NoOptionError:
 | 
						|
				print "No %s option in %s" % (regex, value)
 | 
						|
				return False
 | 
						|
			except MissingSectionHeaderError:
 | 
						|
				print "No section headers in %s" % value
 | 
						|
				return False
 | 
						|
		else:
 | 
						|
			print "Use %11s line : %s" % (regex, shortstr(value))
 | 
						|
			regex_values = [RegexStat(value)]
 | 
						|
 | 
						|
		setattr(self, "_" + regex, regex_values)
 | 
						|
		for regex in regex_values:
 | 
						|
			getattr(
 | 
						|
				self._filter,
 | 
						|
				'add%sRegex' % regextype.title())(regex.getFailRegex())
 | 
						|
		return True
 | 
						|
 | 
						|
	def testIgnoreRegex(self, line):
 | 
						|
		found = False
 | 
						|
		try:
 | 
						|
			ret = self._filter.ignoreLine(line)
 | 
						|
			if ret is not None:
 | 
						|
				found = True
 | 
						|
				regex = self._ignoreregex[ret].inc()
 | 
						|
		except RegexException, e:
 | 
						|
			print e
 | 
						|
			return False
 | 
						|
		return found
 | 
						|
 | 
						|
	def testRegex(self, line):
 | 
						|
		try:
 | 
						|
			line, ret = self._filter.processLine(line, checkAllRegex=True)
 | 
						|
			for match in ret:
 | 
						|
				# Append True/False flag depending if line was matched by
 | 
						|
				# more than one regex
 | 
						|
				match.append(len(ret)>1)
 | 
						|
				regex = self._failregex[match[0]]
 | 
						|
				regex.inc()
 | 
						|
				regex.appendIP(match)
 | 
						|
		except RegexException, e:
 | 
						|
			print e
 | 
						|
			return False
 | 
						|
		except IndexError:
 | 
						|
			print "Sorry, but no <HOST> found in regex"
 | 
						|
			return False
 | 
						|
		return line, ret
 | 
						|
 | 
						|
 | 
						|
	def process(self, test_lines):
 | 
						|
 | 
						|
		for line_no, line in enumerate(test_lines):
 | 
						|
			if line.startswith('#') or not line.strip():
 | 
						|
				# skip comment and empty lines
 | 
						|
				continue
 | 
						|
			is_ignored = fail2banRegex.testIgnoreRegex(line)
 | 
						|
			line_datetimestripped, ret = fail2banRegex.testRegex(line)
 | 
						|
 | 
						|
			if is_ignored:
 | 
						|
				self._line_stats.ignored_lines.append(line)
 | 
						|
				self._line_stats.ignored_lines_timeextracted.append(line_datetimestripped)
 | 
						|
 | 
						|
			if len(ret) > 0:
 | 
						|
				assert(not is_ignored)
 | 
						|
				self._line_stats.matched += 1
 | 
						|
			else:
 | 
						|
				if not is_ignored:
 | 
						|
					self._line_stats.missed_lines.append(line)
 | 
						|
					self._line_stats.missed_lines_timeextracted.append(line_datetimestripped)
 | 
						|
			self._line_stats.tested += 1
 | 
						|
 | 
						|
			if line_no % 10 == 0:
 | 
						|
				self._filter.dateDetector.sortTemplate()
 | 
						|
 | 
						|
 | 
						|
 | 
						|
	def printLines(self, ltype):
 | 
						|
		lstats = self._line_stats
 | 
						|
		assert(len(lstats.missed_lines) == lstats.tested - (lstats.matched + lstats.ignored))
 | 
						|
		l = lstats[ltype + '_lines']
 | 
						|
		if len(l):
 | 
						|
			header = "%s line(s):" % (ltype.capitalize(),)
 | 
						|
			if self._debuggex:
 | 
						|
				if ltype == 'missed':
 | 
						|
					regexlist = self._failregex
 | 
						|
				else:
 | 
						|
					regexlist = self._ignoreregex
 | 
						|
				l = lstats[ltype + '_lines_timeextracted']
 | 
						|
				lines = len(l)*len(regexlist)
 | 
						|
				if lines > 20 or getattr(self, '_print_all_' + ltype):
 | 
						|
					ans = [[]]
 | 
						|
					for arg in [l, regexlist]:
 | 
						|
					    ans = [ x + [y] for x in ans for y in arg ]
 | 
						|
					b = map(lambda a: a[0] +  ' | ' + a[1].getFailRegex() + ' |  ' + debuggexURL(a[0], a[1].getFailRegex()), ans)
 | 
						|
					pprint_list([x.rstrip() for x in b], header)
 | 
						|
				else:
 | 
						|
					print "%s: too many to print.  Use --print-all-%s " \
 | 
						|
						  "to print all %d lines" % (header, ltype, lines)
 | 
						|
			elif len(l) < 20 or getattr(self, '_print_all_' + ltype):
 | 
						|
				pprint_list([x.rstrip() for x in l], header)
 | 
						|
			else:
 | 
						|
				print "%s: too many to print.  Use --print-all-%s " \
 | 
						|
					  "to print all %d lines" % (header, ltype, len(l))
 | 
						|
 | 
						|
	def printStats(self):
 | 
						|
		print
 | 
						|
		print "Results"
 | 
						|
		print "======="
 | 
						|
 | 
						|
		def print_failregexes(title, failregexes):
 | 
						|
			# Print title
 | 
						|
			total, out = 0, []
 | 
						|
			for cnt, failregex in enumerate(failregexes):
 | 
						|
				match = failregex.getStats()
 | 
						|
				total += match
 | 
						|
				if (match or self._verbose):
 | 
						|
					out.append("%2d) [%d] %s" % (cnt+1, match, failregex.getFailRegex()))
 | 
						|
 | 
						|
				if self._verbose and len(failregex.getIPList()):
 | 
						|
					for ip in failregex.getIPList():
 | 
						|
						timeTuple = time.localtime(ip[2])
 | 
						|
						timeString = time.strftime("%a %b %d %H:%M:%S %Y", timeTuple)
 | 
						|
						out.append(
 | 
						|
							"    %s  %s%s" % (
 | 
						|
								ip[1],
 | 
						|
								timeString,
 | 
						|
								ip[3] and " (multiple regex matched)" or ""))
 | 
						|
 | 
						|
			print "\n%s: %d total" % (title, total)
 | 
						|
			pprint_list(out, " #) [# of hits] regular expression")
 | 
						|
			return total
 | 
						|
 | 
						|
		# Print title
 | 
						|
		total = print_failregexes("Failregex", self._failregex)
 | 
						|
		_ = print_failregexes("Ignoreregex", self._ignoreregex)
 | 
						|
 | 
						|
 | 
						|
		print "\nDate template hits:"
 | 
						|
		out = []
 | 
						|
		for template in self._filter.dateDetector.getTemplates():
 | 
						|
			if self._verbose or template.getHits():
 | 
						|
				out.append("[%d] %s" % (template.getHits(), template.getName()))
 | 
						|
		pprint_list(out, "[# of hits] date format")
 | 
						|
 | 
						|
		print "\nLines: %s" % self._line_stats
 | 
						|
 | 
						|
		self.printLines('ignored')
 | 
						|
		self.printLines('missed')
 | 
						|
 | 
						|
		return True
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
 | 
						|
	parser = get_opt_parser()
 | 
						|
	(opts, args) = parser.parse_args()
 | 
						|
 | 
						|
	fail2banRegex = Fail2banRegex(opts)
 | 
						|
 | 
						|
	# We need 2 or 3 parameters
 | 
						|
	if not len(args) in (2, 3):
 | 
						|
		sys.stderr.write("ERROR: provide both <LOG> and <REGEX>.\n\n")
 | 
						|
		parser.print_help()
 | 
						|
		sys.exit(-1)
 | 
						|
 | 
						|
	# TODO: taken from -testcases -- move common functionality somewhere
 | 
						|
	if opts.log_level is not None: # pragma: no cover
 | 
						|
		# so we had explicit settings
 | 
						|
		logSys.setLevel(getattr(logging, opts.log_level.upper()))
 | 
						|
	else: # pragma: no cover
 | 
						|
		# suppress the logging but it would leave unittests' progress dots
 | 
						|
		# ticking, unless like with '-l fatal' which would be silent
 | 
						|
		# unless error occurs
 | 
						|
		logSys.setLevel(getattr(logging, 'FATAL'))
 | 
						|
 | 
						|
	# Add the default logging handler
 | 
						|
	stdout = logging.StreamHandler(sys.stdout)
 | 
						|
 | 
						|
	fmt = 'D: %(message)s'
 | 
						|
 | 
						|
	if opts.log_traceback:
 | 
						|
		Formatter = FormatterWithTraceBack
 | 
						|
		fmt = (opts.full_traceback and ' %(tb)s' or ' %(tbc)s') + fmt
 | 
						|
	else:
 | 
						|
		Formatter = logging.Formatter
 | 
						|
 | 
						|
	# Custom log format for the verbose tests runs
 | 
						|
	if opts.verbose > 1: # pragma: no cover
 | 
						|
		stdout.setFormatter(Formatter(' %(asctime)-15s %(thread)s' + fmt))
 | 
						|
	else: # pragma: no cover
 | 
						|
		# just prefix with the space
 | 
						|
		stdout.setFormatter(Formatter(fmt))
 | 
						|
	logSys.addHandler(stdout)
 | 
						|
 | 
						|
	print
 | 
						|
	print "Running tests"
 | 
						|
	print "============="
 | 
						|
	print
 | 
						|
 | 
						|
	cmd_log, cmd_regex = args[:2]
 | 
						|
 | 
						|
	if len(args) == 3:
 | 
						|
		fail2banRegex.readRegex(args[2], 'ignore') or sys.exit(-1)
 | 
						|
 | 
						|
	fail2banRegex.readRegex(cmd_regex, 'fail') or sys.exit(-1)
 | 
						|
 | 
						|
	if os.path.isfile(cmd_log):
 | 
						|
		try:
 | 
						|
			hdlr = open(cmd_log)
 | 
						|
			print "Use         log file : %s" % cmd_log
 | 
						|
			test_lines = hdlr # Iterable
 | 
						|
		except IOError, e:
 | 
						|
			print e
 | 
						|
			sys.exit(-1)
 | 
						|
	else:
 | 
						|
		print "Use      single line : %s" % shortstr(cmd_log)
 | 
						|
		test_lines = [ cmd_log ]
 | 
						|
	print
 | 
						|
 | 
						|
	fail2banRegex.process(test_lines)
 | 
						|
 | 
						|
	fail2banRegex.printStats() or sys.exit(-1)
 |