mirror of https://github.com/fail2ban/fail2ban
				
				
				
			
		
			
				
	
	
		
			252 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
			
		
		
	
	
			252 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
#!/usr/bin/python
 | 
						|
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
 | 
						|
# vi: set ft=python sts=4 ts=4 sw=4 noet :
 | 
						|
"""Script to run Fail2Ban tests battery
 | 
						|
"""
 | 
						|
 | 
						|
# This file is part of Fail2Ban.
 | 
						|
#
 | 
						|
# Fail2Ban is free software; you can redistribute it and/or modify
 | 
						|
# it under the terms of the GNU General Public License as published by
 | 
						|
# the Free Software Foundation; either version 2 of the License, or
 | 
						|
# (at your option) any later version.
 | 
						|
#
 | 
						|
# Fail2Ban is distributed in the hope that it will be useful,
 | 
						|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
# GNU General Public License for more details.
 | 
						|
#
 | 
						|
# You should have received a copy of the GNU General Public License
 | 
						|
# along with Fail2Ban; if not, write to the Free Software
 | 
						|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 | 
						|
 | 
						|
__author__ = "Cyril Jaquier"
 | 
						|
__copyright__ = "Copyright (c) 2004 Cyril Jaquier, 2012- Yaroslav Halchenko"
 | 
						|
__license__ = "GPL"
 | 
						|
 | 
						|
 | 
						|
import unittest, logging, sys, time, os
 | 
						|
 | 
						|
if sys.version_info >= (2, 6):
 | 
						|
	import json
 | 
						|
else: # pragma: no cover
 | 
						|
	try:
 | 
						|
		import simplejson as json
 | 
						|
	except ImportError:
 | 
						|
		json = None
 | 
						|
 | 
						|
from common.version import version
 | 
						|
from testcases import banmanagertestcase
 | 
						|
from testcases import clientreadertestcase
 | 
						|
from testcases import failmanagertestcase
 | 
						|
from testcases import filtertestcase
 | 
						|
from testcases import servertestcase
 | 
						|
from testcases import datedetectortestcase
 | 
						|
from testcases import actiontestcase
 | 
						|
from testcases import sockettestcase
 | 
						|
from testcases import misctestcase
 | 
						|
if json:
 | 
						|
	from testcases import samplestestcase
 | 
						|
 | 
						|
from testcases.utils import FormatterWithTraceBack
 | 
						|
from testcases import actionstestcase
 | 
						|
from server.mytime import MyTime
 | 
						|
 | 
						|
from optparse import OptionParser, Option
 | 
						|
 | 
						|
def get_opt_parser():
 | 
						|
	# use module docstring for help output
 | 
						|
	p = OptionParser(
 | 
						|
				usage="%s [OPTIONS] [regexps]\n" % sys.argv[0] + __doc__,
 | 
						|
				version="%prog " + version)
 | 
						|
 | 
						|
	p.add_options([
 | 
						|
		Option('-l', "--log-level", type="choice",
 | 
						|
			   dest="log_level",
 | 
						|
			   choices=('heavydebug', 'debug', 'info', 'warning', 'error', 'fatal'),
 | 
						|
			   default=None,
 | 
						|
			   help="Log level for the logger to use during running tests"),
 | 
						|
		Option('-n', "--no-network", action="store_true",
 | 
						|
			   dest="no_network",
 | 
						|
			   help="Do not run tests that require the network"),
 | 
						|
		Option("-t", "--log-traceback", action='store_true',
 | 
						|
			   help="Enrich log-messages with compressed tracebacks"),
 | 
						|
		Option("--full-traceback", action='store_true',
 | 
						|
			   help="Either to make the tracebacks full, not compressed (as by default)"),
 | 
						|
 | 
						|
		])
 | 
						|
 | 
						|
	return p
 | 
						|
 | 
						|
parser = get_opt_parser()
 | 
						|
(opts, regexps) = parser.parse_args()
 | 
						|
 | 
						|
#
 | 
						|
# Logging
 | 
						|
#
 | 
						|
logSys = logging.getLogger("fail2ban")
 | 
						|
 | 
						|
# Numerical level of verbosity corresponding to a log "level"
 | 
						|
verbosity = {'heavydebug': 4,
 | 
						|
			 'debug': 3,
 | 
						|
			 'info': 2,
 | 
						|
			 'warning': 1,
 | 
						|
			 'error': 1,
 | 
						|
			 'fatal': 0,
 | 
						|
			 None: 1}[opts.log_level]
 | 
						|
 | 
						|
if opts.log_level is not None: # pragma: no cover
 | 
						|
	# so we had explicit settings
 | 
						|
	logSys.setLevel(getattr(logging, opts.log_level.upper()))
 | 
						|
else: # pragma: no cover
 | 
						|
	# suppress the logging but it would leave unittests' progress dots
 | 
						|
	# ticking, unless like with '-l fatal' which would be silent
 | 
						|
	# unless error occurs
 | 
						|
	logSys.setLevel(getattr(logging, 'FATAL'))
 | 
						|
 | 
						|
# Add the default logging handler
 | 
						|
stdout = logging.StreamHandler(sys.stdout)
 | 
						|
 | 
						|
fmt = ' %(message)s'
 | 
						|
 | 
						|
if opts.log_traceback or opts.full_traceback:
 | 
						|
	Formatter = FormatterWithTraceBack
 | 
						|
	fmt = (opts.full_traceback and ' %(tb)s' or ' %(tbc)s') + fmt
 | 
						|
else:
 | 
						|
	Formatter = logging.Formatter
 | 
						|
 | 
						|
# Custom log format for the verbose tests runs
 | 
						|
if verbosity > 1: # pragma: no cover
 | 
						|
	stdout.setFormatter(Formatter(' %(asctime)-15s %(thread)s' + fmt))
 | 
						|
else: # pragma: no cover
 | 
						|
	# just prefix with the space
 | 
						|
	stdout.setFormatter(Formatter(fmt))
 | 
						|
logSys.addHandler(stdout)
 | 
						|
 | 
						|
#
 | 
						|
# Let know the version
 | 
						|
#
 | 
						|
if not opts.log_level or opts.log_level != 'fatal': # pragma: no cover
 | 
						|
	print "Fail2ban %s test suite. Python %s. Please wait..." \
 | 
						|
	    % (version, str(sys.version).replace('\n', ''))
 | 
						|
 | 
						|
 | 
						|
#
 | 
						|
# Gather the tests
 | 
						|
#
 | 
						|
if not len(regexps): # pragma: no cover
 | 
						|
	tests = unittest.TestSuite()
 | 
						|
else: # pragma: no cover
 | 
						|
	import re
 | 
						|
	class FilteredTestSuite(unittest.TestSuite):
 | 
						|
		_regexps = [re.compile(r) for r in regexps]
 | 
						|
		def addTest(self, suite):
 | 
						|
			suite_str = str(suite)
 | 
						|
			for r in self._regexps:
 | 
						|
				if r.search(suite_str):
 | 
						|
					super(FilteredTestSuite, self).addTest(suite)
 | 
						|
					return
 | 
						|
 | 
						|
	tests = FilteredTestSuite()
 | 
						|
 | 
						|
# Server
 | 
						|
#tests.addTest(unittest.makeSuite(servertestcase.StartStop))
 | 
						|
tests.addTest(unittest.makeSuite(servertestcase.Transmitter))
 | 
						|
tests.addTest(unittest.makeSuite(servertestcase.JailTests))
 | 
						|
tests.addTest(unittest.makeSuite(servertestcase.RegexTests))
 | 
						|
tests.addTest(unittest.makeSuite(actiontestcase.ExecuteAction))
 | 
						|
tests.addTest(unittest.makeSuite(actionstestcase.ExecuteActions))
 | 
						|
# FailManager
 | 
						|
tests.addTest(unittest.makeSuite(failmanagertestcase.AddFailure))
 | 
						|
# BanManager
 | 
						|
tests.addTest(unittest.makeSuite(banmanagertestcase.AddFailure))
 | 
						|
# ClientReaders
 | 
						|
tests.addTest(unittest.makeSuite(clientreadertestcase.ConfigReaderTest))
 | 
						|
tests.addTest(unittest.makeSuite(clientreadertestcase.JailReaderTest))
 | 
						|
tests.addTest(unittest.makeSuite(clientreadertestcase.JailsReaderTest))
 | 
						|
# CSocket and AsyncServer
 | 
						|
tests.addTest(unittest.makeSuite(sockettestcase.Socket))
 | 
						|
# Misc helpers
 | 
						|
tests.addTest(unittest.makeSuite(misctestcase.HelpersTest))
 | 
						|
tests.addTest(unittest.makeSuite(misctestcase.SetupTest))
 | 
						|
tests.addTest(unittest.makeSuite(misctestcase.TestsUtilsTest))
 | 
						|
tests.addTest(unittest.makeSuite(misctestcase.CustomDateFormatsTest))
 | 
						|
 | 
						|
# Filter
 | 
						|
if not opts.no_network:
 | 
						|
    tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIPDNS))
 | 
						|
tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIP))
 | 
						|
tests.addTest(unittest.makeSuite(filtertestcase.BasicFilter))
 | 
						|
tests.addTest(unittest.makeSuite(filtertestcase.LogFile))
 | 
						|
tests.addTest(unittest.makeSuite(filtertestcase.LogFileFilterPoll))
 | 
						|
tests.addTest(unittest.makeSuite(filtertestcase.LogFileMonitor))
 | 
						|
if not opts.no_network:
 | 
						|
    tests.addTest(unittest.makeSuite(filtertestcase.GetFailures))
 | 
						|
    tests.addTest(unittest.makeSuite(filtertestcase.DNSUtilsTests))
 | 
						|
tests.addTest(unittest.makeSuite(filtertestcase.JailTests))
 | 
						|
 | 
						|
# DateDetector
 | 
						|
tests.addTest(unittest.makeSuite(datedetectortestcase.DateDetectorTest))
 | 
						|
if json:
 | 
						|
	# Filter Regex tests with sample logs
 | 
						|
	tests.addTest(unittest.makeSuite(samplestestcase.FilterSamplesRegex))
 | 
						|
else: # pragma: no cover
 | 
						|
	print "I: Skipping filter samples testing. No simplejson/json module"
 | 
						|
 | 
						|
#
 | 
						|
# Extensive use-tests of different available filters backends
 | 
						|
#
 | 
						|
 | 
						|
from server.filterpoll import FilterPoll
 | 
						|
filters = [FilterPoll]					  # always available
 | 
						|
 | 
						|
# Additional filters available only if external modules are available
 | 
						|
# yoh: Since I do not know better way for parametric tests
 | 
						|
#      with good old unittest
 | 
						|
try:
 | 
						|
	from server.filtergamin import FilterGamin
 | 
						|
	filters.append(FilterGamin)
 | 
						|
except Exception, e: # pragma: no cover
 | 
						|
	print "I: Skipping gamin backend testing. Got exception '%s'" % e
 | 
						|
 | 
						|
try:
 | 
						|
	from server.filterpyinotify import FilterPyinotify
 | 
						|
	filters.append(FilterPyinotify)
 | 
						|
except Exception, e: # pragma: no cover
 | 
						|
	print "I: Skipping pyinotify backend testing. Got exception '%s'" % e
 | 
						|
 | 
						|
for Filter_ in filters:
 | 
						|
	tests.addTest(unittest.makeSuite(
 | 
						|
		filtertestcase.get_monitor_failures_testcase(Filter_)))
 | 
						|
 | 
						|
# Server test for logging elements which break logging used to support
 | 
						|
# testcases analysis
 | 
						|
tests.addTest(unittest.makeSuite(servertestcase.TransmitterLogging))
 | 
						|
 | 
						|
#
 | 
						|
# Run the tests
 | 
						|
#
 | 
						|
testRunner = unittest.TextTestRunner(verbosity=verbosity)
 | 
						|
 | 
						|
try:
 | 
						|
	# Set the time to a fixed, known value
 | 
						|
	# Sun Aug 14 12:00:00 CEST 2005
 | 
						|
	# yoh: we need to adjust TZ to match the one used by Cyril so all the timestamps match
 | 
						|
	old_TZ = os.environ.get('TZ', None)
 | 
						|
	os.environ['TZ'] = 'Europe/Zurich'
 | 
						|
	time.tzset()
 | 
						|
	MyTime.setTime(1124013600)
 | 
						|
 | 
						|
	tests_results = testRunner.run(tests)
 | 
						|
 | 
						|
finally: # pragma: no cover
 | 
						|
	# Just for the sake of it reset the TZ
 | 
						|
	# yoh: move all this into setup/teardown methods within tests
 | 
						|
	os.environ.pop('TZ')
 | 
						|
	if old_TZ:
 | 
						|
		os.environ['TZ'] = old_TZ
 | 
						|
	time.tzset()
 | 
						|
 | 
						|
if not tests_results.wasSuccessful(): # pragma: no cover
 | 
						|
	sys.exit(1)
 |