mirror of https://github.com/fail2ban/fail2ban
				
				
				
			
		
			
				
	
	
		
			188 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
			
		
		
	
	
			188 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
#!/usr/bin/python
 | 
						|
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
 | 
						|
# vi: set ft=python sts=4 ts=4 sw=4 noet :
 | 
						|
"""Script to run Fail2Ban tests battery
 | 
						|
"""
 | 
						|
 | 
						|
# This file is part of Fail2Ban.
 | 
						|
#
 | 
						|
# Fail2Ban is free software; you can redistribute it and/or modify
 | 
						|
# it under the terms of the GNU General Public License as published by
 | 
						|
# the Free Software Foundation; either version 2 of the License, or
 | 
						|
# (at your option) any later version.
 | 
						|
#
 | 
						|
# Fail2Ban is distributed in the hope that it will be useful,
 | 
						|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
# GNU General Public License for more details.
 | 
						|
#
 | 
						|
# You should have received a copy of the GNU General Public License
 | 
						|
# along with Fail2Ban; if not, write to the Free Software
 | 
						|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 | 
						|
 | 
						|
# Author: Cyril Jaquier
 | 
						|
 | 
						|
__author__ = "Cyril Jaquier"
 | 
						|
__version__ = "$Revision$"
 | 
						|
__date__ = "$Date$"
 | 
						|
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
 | 
						|
__license__ = "GPL"
 | 
						|
 | 
						|
 | 
						|
import unittest, logging, sys, time, os
 | 
						|
 | 
						|
from common.version import version
 | 
						|
from testcases import banmanagertestcase
 | 
						|
from testcases import clientreadertestcase
 | 
						|
from testcases import failmanagertestcase
 | 
						|
from testcases import filtertestcase
 | 
						|
from testcases import servertestcase
 | 
						|
from testcases import datedetectortestcase
 | 
						|
from testcases import actiontestcase
 | 
						|
from server.mytime import MyTime
 | 
						|
 | 
						|
from optparse import OptionParser, Option
 | 
						|
 | 
						|
def get_opt_parser():
 | 
						|
	# use module docstring for help output
 | 
						|
	p = OptionParser(
 | 
						|
				usage="%s [OPTIONS]\n" % sys.argv[0] + __doc__,
 | 
						|
				version="%prog " + version)
 | 
						|
 | 
						|
	p.add_options([
 | 
						|
		Option('-l', "--log-level", type="choice",
 | 
						|
			   dest="log_level",
 | 
						|
			   choices=('debug', 'info', 'warn', 'error', 'fatal'),
 | 
						|
			   default=None,
 | 
						|
			   help="Log level for the logger to use during running tests"),
 | 
						|
		])
 | 
						|
 | 
						|
	return p
 | 
						|
 | 
						|
parser = get_opt_parser()
 | 
						|
(opts, files) = parser.parse_args()
 | 
						|
assert(not len(files))
 | 
						|
 | 
						|
#
 | 
						|
# Logging
 | 
						|
#
 | 
						|
logSys = logging.getLogger("fail2ban")
 | 
						|
 | 
						|
# Numerical level of verbosity corresponding to a log "level"
 | 
						|
verbosity = {'debug': 3,
 | 
						|
			 'info': 2,
 | 
						|
			 'warn': 1,
 | 
						|
			 'error': 1,
 | 
						|
			 'fatal': 0,
 | 
						|
			 None: 1}[opts.log_level]
 | 
						|
 | 
						|
if opts.log_level is not None:
 | 
						|
	# so we had explicit settings
 | 
						|
	logSys.setLevel(getattr(logging, opts.log_level.upper()))
 | 
						|
else:
 | 
						|
	# suppress the logging but it would leave unittests' progress dots
 | 
						|
	# ticking, unless like with '-l fatal' which would be silent
 | 
						|
	# unless error occurs
 | 
						|
	logSys.setLevel(getattr(logging, 'FATAL'))
 | 
						|
 | 
						|
# Add the default logging handler
 | 
						|
stdout = logging.StreamHandler(sys.stdout)
 | 
						|
# Custom log format for the verbose tests runs
 | 
						|
if verbosity > 1:
 | 
						|
	stdout.setFormatter(logging.Formatter(' %(asctime)-15s %(thread)s %(message)s'))
 | 
						|
else:
 | 
						|
	# just prefix with the space
 | 
						|
	stdout.setFormatter(logging.Formatter(' %(message)s'))
 | 
						|
logSys.addHandler(stdout)
 | 
						|
 | 
						|
 | 
						|
#
 | 
						|
# Let know the version
 | 
						|
#
 | 
						|
if not opts.log_level or opts.log_level != 'fatal':
 | 
						|
	print "Fail2ban " + version + " test suite. Please wait..."
 | 
						|
 | 
						|
 | 
						|
#
 | 
						|
# Gather the tests
 | 
						|
#
 | 
						|
tests = unittest.TestSuite()
 | 
						|
 | 
						|
# Server
 | 
						|
#tests.addTest(unittest.makeSuite(servertestcase.StartStop))
 | 
						|
#tests.addTest(unittest.makeSuite(servertestcase.Transmitter))
 | 
						|
tests.addTest(unittest.makeSuite(actiontestcase.ExecuteAction))
 | 
						|
# FailManager
 | 
						|
tests.addTest(unittest.makeSuite(failmanagertestcase.AddFailure))
 | 
						|
# BanManager
 | 
						|
tests.addTest(unittest.makeSuite(banmanagertestcase.AddFailure))
 | 
						|
# ClientReader
 | 
						|
tests.addTest(unittest.makeSuite(clientreadertestcase.JailReaderTest))
 | 
						|
 | 
						|
from server.filterpoll import FilterPoll
 | 
						|
filters = [FilterPoll]					  # always available
 | 
						|
 | 
						|
# Additional filters available only if external modules are available
 | 
						|
# yoh: Since I do not know better way for parametric tests
 | 
						|
#      with good old unittest
 | 
						|
try:
 | 
						|
	from server.filtergamin import FilterGamin
 | 
						|
	# That shmug plain doesn't work and stalls things ATM
 | 
						|
	# filters.append(FilterGamin)
 | 
						|
except:
 | 
						|
	pass
 | 
						|
 | 
						|
try:
 | 
						|
	from server.filterpyinotify import FilterPyinotify
 | 
						|
	filters.append(FilterPyinotify)
 | 
						|
except:
 | 
						|
	pass
 | 
						|
 | 
						|
for Filter_ in filters:
 | 
						|
	tests.addTest(unittest.makeSuite(
 | 
						|
		filtertestcase.get_monitor_failures_testcase(Filter_)))
 | 
						|
 | 
						|
# yoh: adding them (in particular datadetectortestscase before above
 | 
						|
#      get_monitor_failures_testcase's makes them fail (probably due
 | 
						|
#      to additional thread making it busier or smth like
 | 
						|
#      that)... TODO
 | 
						|
 | 
						|
# Filter
 | 
						|
tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIP))
 | 
						|
tests.addTest(unittest.makeSuite(filtertestcase.LogFile))
 | 
						|
tests.addTest(unittest.makeSuite(filtertestcase.LogFileMonitor))
 | 
						|
tests.addTest(unittest.makeSuite(filtertestcase.GetFailures))
 | 
						|
tests.addTest(unittest.makeSuite(filtertestcase.DNSUtilsTests))
 | 
						|
 | 
						|
# DateDetector
 | 
						|
tests.addTest(unittest.makeSuite(datedetectortestcase.DateDetectorTest))
 | 
						|
 | 
						|
 | 
						|
 | 
						|
#
 | 
						|
# Run the tests
 | 
						|
#
 | 
						|
testRunner = unittest.TextTestRunner(verbosity=verbosity)
 | 
						|
 | 
						|
try:
 | 
						|
	# Set the time to a fixed, known value
 | 
						|
	# Sun Aug 14 12:00:00 CEST 2005
 | 
						|
	# yoh: we need to adjust TZ to match the one used by Cyril so all the timestamps match
 | 
						|
	old_TZ = os.environ.get('TZ', None)
 | 
						|
	os.environ['TZ'] = 'Europe/Zurich'
 | 
						|
	time.tzset()
 | 
						|
	MyTime.setTime(1124013600)
 | 
						|
 | 
						|
	tests_results = testRunner.run(tests)
 | 
						|
 | 
						|
finally:
 | 
						|
	# Just for the sake of it reset the TZ
 | 
						|
	# yoh: move all this into setup/teardown methods within tests
 | 
						|
	os.environ.pop('TZ')
 | 
						|
	if old_TZ:
 | 
						|
		os.environ['TZ'] = old_TZ
 | 
						|
	time.tzset()
 | 
						|
 | 
						|
if not tests_results.wasSuccessful():
 | 
						|
	sys.exit(1)
 |