mirror of https://github.com/fail2ban/fail2ban
				
				
				
			
		
			
				
	
	
		
			228 lines
		
	
	
		
			7.2 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
			
		
		
	
	
			228 lines
		
	
	
		
			7.2 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
#!/usr/bin/python
 | 
						|
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
 | 
						|
# vi: set ft=python sts=4 ts=4 sw=4 noet :
 | 
						|
"""Script to run Fail2Ban tests battery
 | 
						|
"""
 | 
						|
 | 
						|
# This file is part of Fail2Ban.
 | 
						|
#
 | 
						|
# Fail2Ban is free software; you can redistribute it and/or modify
 | 
						|
# it under the terms of the GNU General Public License as published by
 | 
						|
# the Free Software Foundation; either version 2 of the License, or
 | 
						|
# (at your option) any later version.
 | 
						|
#
 | 
						|
# Fail2Ban is distributed in the hope that it will be useful,
 | 
						|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
# GNU General Public License for more details.
 | 
						|
#
 | 
						|
# You should have received a copy of the GNU General Public License
 | 
						|
# along with Fail2Ban; if not, write to the Free Software
 | 
						|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 | 
						|
 | 
						|
__author__ = "Cyril Jaquier"
 | 
						|
__copyright__ = "Copyright (c) 2004 Cyril Jaquier, 2012- Yaroslav Halchenko"
 | 
						|
__license__ = "GPL"
 | 
						|
 | 
						|
 | 
						|
import unittest, logging, sys, time, os
 | 
						|
 | 
						|
# Check if local fail2ban module exists, and use if it exists by 
 | 
						|
# modifying the path. This is such that tests can be used in dev
 | 
						|
# environment.
 | 
						|
if os.path.exists("fail2ban/__init__.py"):
 | 
						|
	sys.path.insert(0, ".")
 | 
						|
from fail2ban.version import version
 | 
						|
from fail2ban.tests import banmanagertestcase
 | 
						|
from fail2ban.tests import clientreadertestcase
 | 
						|
from fail2ban.tests import failmanagertestcase
 | 
						|
from fail2ban.tests import filtertestcase
 | 
						|
from fail2ban.tests import servertestcase
 | 
						|
from fail2ban.tests import datedetectortestcase
 | 
						|
from fail2ban.tests import actiontestcase
 | 
						|
from fail2ban.tests import sockettestcase
 | 
						|
 | 
						|
from fail2ban.tests.utils import FormatterWithTraceBack
 | 
						|
from fail2ban.server.mytime import MyTime
 | 
						|
 | 
						|
from optparse import OptionParser, Option
 | 
						|
 | 
						|
def get_opt_parser():
 | 
						|
	# use module docstring for help output
 | 
						|
	p = OptionParser(
 | 
						|
				usage="%s [OPTIONS] [regexps]\n" % sys.argv[0] + __doc__,
 | 
						|
				version="%prog " + version)
 | 
						|
 | 
						|
	p.add_options([
 | 
						|
		Option('-l', "--log-level", type="choice",
 | 
						|
			   dest="log_level",
 | 
						|
			   choices=('debug', 'info', 'warn', 'error', 'fatal'),
 | 
						|
			   default=None,
 | 
						|
			   help="Log level for the logger to use during running tests"),
 | 
						|
		Option('-n', "--no-network", action="store_true",
 | 
						|
			   dest="no_network",
 | 
						|
			   help="Do not run tests that require the network"),
 | 
						|
		Option("-t", "--log-traceback", action='store_true',
 | 
						|
			   help="Enrich log-messages with compressed tracebacks"),
 | 
						|
		Option("--full-traceback", action='store_true',
 | 
						|
			   help="Either to make the tracebacks full, not compressed (as by default)"),
 | 
						|
 | 
						|
		])
 | 
						|
 | 
						|
	return p
 | 
						|
 | 
						|
parser = get_opt_parser()
 | 
						|
(opts, regexps) = parser.parse_args()
 | 
						|
 | 
						|
#
 | 
						|
# Logging
 | 
						|
#
 | 
						|
logSys = logging.getLogger("fail2ban")
 | 
						|
 | 
						|
# Numerical level of verbosity corresponding to a log "level"
 | 
						|
verbosity = {'debug': 3,
 | 
						|
			 'info': 2,
 | 
						|
			 'warn': 1,
 | 
						|
			 'error': 1,
 | 
						|
			 'fatal': 0,
 | 
						|
			 None: 1}[opts.log_level]
 | 
						|
 | 
						|
if opts.log_level is not None: # pragma: no cover
 | 
						|
	# so we had explicit settings
 | 
						|
	logSys.setLevel(getattr(logging, opts.log_level.upper()))
 | 
						|
else: # pragma: no cover
 | 
						|
	# suppress the logging but it would leave unittests' progress dots
 | 
						|
	# ticking, unless like with '-l fatal' which would be silent
 | 
						|
	# unless error occurs
 | 
						|
	logSys.setLevel(getattr(logging, 'FATAL'))
 | 
						|
 | 
						|
# Add the default logging handler
 | 
						|
stdout = logging.StreamHandler(sys.stdout)
 | 
						|
 | 
						|
fmt = ' %(message)s'
 | 
						|
 | 
						|
if opts.log_traceback:
 | 
						|
	Formatter = FormatterWithTraceBack
 | 
						|
	fmt = (opts.full_traceback and ' %(tb)s' or ' %(tbc)s') + fmt
 | 
						|
else:
 | 
						|
	Formatter = logging.Formatter
 | 
						|
 | 
						|
# Custom log format for the verbose tests runs
 | 
						|
if verbosity > 1: # pragma: no cover
 | 
						|
	stdout.setFormatter(Formatter(' %(asctime)-15s %(thread)s' + fmt))
 | 
						|
else: # pragma: no cover
 | 
						|
	# just prefix with the space
 | 
						|
	stdout.setFormatter(Formatter(fmt))
 | 
						|
logSys.addHandler(stdout)
 | 
						|
 | 
						|
#
 | 
						|
# Let know the version
 | 
						|
#
 | 
						|
if not opts.log_level or opts.log_level != 'fatal': # pragma: no cover
 | 
						|
	print "Fail2ban %s test suite. Python %s. Please wait..." \
 | 
						|
	    % (version, str(sys.version).replace('\n', ''))
 | 
						|
 | 
						|
 | 
						|
#
 | 
						|
# Gather the tests
 | 
						|
#
 | 
						|
if not len(regexps): # pragma: no cover
 | 
						|
	tests = unittest.TestSuite()
 | 
						|
else: # pragma: no cover
 | 
						|
	import re
 | 
						|
	class FilteredTestSuite(unittest.TestSuite):
 | 
						|
		_regexps = [re.compile(r) for r in regexps]
 | 
						|
		def addTest(self, suite):
 | 
						|
			suite_str = str(suite)
 | 
						|
			for r in self._regexps:
 | 
						|
				if r.search(suite_str):
 | 
						|
					super(FilteredTestSuite, self).addTest(suite)
 | 
						|
					return
 | 
						|
 | 
						|
	tests = FilteredTestSuite()
 | 
						|
 | 
						|
# Server
 | 
						|
#tests.addTest(unittest.makeSuite(servertestcase.StartStop))
 | 
						|
tests.addTest(unittest.makeSuite(servertestcase.Transmitter))
 | 
						|
tests.addTest(unittest.makeSuite(actiontestcase.ExecuteAction))
 | 
						|
# FailManager
 | 
						|
tests.addTest(unittest.makeSuite(failmanagertestcase.AddFailure))
 | 
						|
# BanManager
 | 
						|
tests.addTest(unittest.makeSuite(banmanagertestcase.AddFailure))
 | 
						|
# ClientReaders
 | 
						|
tests.addTest(unittest.makeSuite(clientreadertestcase.ConfigReaderTest))
 | 
						|
tests.addTest(unittest.makeSuite(clientreadertestcase.JailReaderTest))
 | 
						|
tests.addTest(unittest.makeSuite(clientreadertestcase.JailsReaderTest))
 | 
						|
# CSocket and AsyncServer
 | 
						|
tests.addTest(unittest.makeSuite(sockettestcase.Socket))
 | 
						|
 | 
						|
# Filter
 | 
						|
if not opts.no_network:
 | 
						|
    tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIP))
 | 
						|
tests.addTest(unittest.makeSuite(filtertestcase.LogFile))
 | 
						|
tests.addTest(unittest.makeSuite(filtertestcase.LogFileMonitor))
 | 
						|
if not opts.no_network:
 | 
						|
    tests.addTest(unittest.makeSuite(filtertestcase.GetFailures))
 | 
						|
    tests.addTest(unittest.makeSuite(filtertestcase.DNSUtilsTests))
 | 
						|
tests.addTest(unittest.makeSuite(filtertestcase.JailTests))
 | 
						|
 | 
						|
# DateDetector
 | 
						|
tests.addTest(unittest.makeSuite(datedetectortestcase.DateDetectorTest))
 | 
						|
 | 
						|
#
 | 
						|
# Extensive use-tests of different available filters backends
 | 
						|
#
 | 
						|
 | 
						|
from fail2ban.server.filterpoll import FilterPoll
 | 
						|
filters = [FilterPoll]					  # always available
 | 
						|
 | 
						|
# Additional filters available only if external modules are available
 | 
						|
# yoh: Since I do not know better way for parametric tests
 | 
						|
#      with good old unittest
 | 
						|
try:
 | 
						|
	from fail2ban.server.filtergamin import FilterGamin
 | 
						|
	filters.append(FilterGamin)
 | 
						|
except Exception, e: # pragma: no cover
 | 
						|
	print "I: Skipping gamin backend testing. Got exception '%s'" % e
 | 
						|
 | 
						|
try:
 | 
						|
	from fail2ban.server.filterpyinotify import FilterPyinotify
 | 
						|
	filters.append(FilterPyinotify)
 | 
						|
except Exception, e: # pragma: no cover
 | 
						|
	print "I: Skipping pyinotify backend testing. Got exception '%s'" % e
 | 
						|
 | 
						|
for Filter_ in filters:
 | 
						|
	tests.addTest(unittest.makeSuite(
 | 
						|
		filtertestcase.get_monitor_failures_testcase(Filter_)))
 | 
						|
 | 
						|
# Server test for logging elements which break logging used to support
 | 
						|
# testcases analysis
 | 
						|
tests.addTest(unittest.makeSuite(servertestcase.TransmitterLogging))
 | 
						|
 | 
						|
#
 | 
						|
# Run the tests
 | 
						|
#
 | 
						|
testRunner = unittest.TextTestRunner(verbosity=verbosity)
 | 
						|
 | 
						|
try:
 | 
						|
	# Set the time to a fixed, known value
 | 
						|
	# Sun Aug 14 12:00:00 CEST 2005
 | 
						|
	# yoh: we need to adjust TZ to match the one used by Cyril so all the timestamps match
 | 
						|
	old_TZ = os.environ.get('TZ', None)
 | 
						|
	os.environ['TZ'] = 'Europe/Zurich'
 | 
						|
	time.tzset()
 | 
						|
	MyTime.setTime(1124013600)
 | 
						|
 | 
						|
	tests_results = testRunner.run(tests)
 | 
						|
 | 
						|
finally: # pragma: no cover
 | 
						|
	# Just for the sake of it reset the TZ
 | 
						|
	# yoh: move all this into setup/teardown methods within tests
 | 
						|
	os.environ.pop('TZ')
 | 
						|
	if old_TZ:
 | 
						|
		os.environ['TZ'] = old_TZ
 | 
						|
	time.tzset()
 | 
						|
 | 
						|
if not tests_results.wasSuccessful(): # pragma: no cover
 | 
						|
	sys.exit(1)
 |