|
|
|
#!/usr/bin/python
|
|
|
|
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
|
|
|
|
# vi: set ft=python sts=4 ts=4 sw=4 noet :
|
|
|
|
"""Script to run Fail2Ban tests battery
|
|
|
|
"""
|
|
|
|
|
|
|
|
# This file is part of Fail2Ban.
|
|
|
|
#
|
|
|
|
# Fail2Ban is free software; you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU General Public License as published by
|
|
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
|
|
# (at your option) any later version.
|
|
|
|
#
|
|
|
|
# Fail2Ban is distributed in the hope that it will be useful,
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
# GNU General Public License for more details.
|
|
|
|
#
|
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
|
# along with Fail2Ban; if not, write to the Free Software
|
|
|
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
|
|
|
|
|
|
__author__ = "Cyril Jaquier"
|
|
|
|
__copyright__ = "Copyright (c) 2004 Cyril Jaquier, 2012- Yaroslav Halchenko"
|
|
|
|
__license__ = "GPL"
|
|
|
|
|
|
|
|
|
|
|
|
import unittest, logging, sys, time, os
|
|
|
|
|
|
|
|
if sys.version_info >= (2, 6):
|
|
|
|
import json
|
|
|
|
else:
|
|
|
|
try:
|
|
|
|
import simplejson as json
|
|
|
|
except ImportError:
|
|
|
|
json = None
|
|
|
|
|
|
|
|
from common.version import version
|
|
|
|
from testcases import banmanagertestcase
|
|
|
|
from testcases import clientreadertestcase
|
|
|
|
from testcases import failmanagertestcase
|
|
|
|
from testcases import filtertestcase
|
|
|
|
from testcases import servertestcase
|
|
|
|
from testcases import datedetectortestcase
|
|
|
|
from testcases import actiontestcase
|
|
|
|
from testcases import sockettestcase
|
|
|
|
from testcases import misctestcase
|
|
|
|
if json:
|
|
|
|
from testcases import samplestestcase
|
|
|
|
|
|
|
|
from testcases.utils import FormatterWithTraceBack
|
|
|
|
from server.mytime import MyTime
|
|
|
|
|
|
|
|
from optparse import OptionParser, Option
|
|
|
|
|
|
|
|
def get_opt_parser():
|
|
|
|
# use module docstring for help output
|
|
|
|
p = OptionParser(
|
|
|
|
usage="%s [OPTIONS] [regexps]\n" % sys.argv[0] + __doc__,
|
|
|
|
version="%prog " + version)
|
|
|
|
|
|
|
|
p.add_options([
|
|
|
|
Option('-l', "--log-level", type="choice",
|
|
|
|
dest="log_level",
|
|
|
|
choices=('heavydebug', 'debug', 'info', 'warning', 'error', 'fatal'),
|
|
|
|
default=None,
|
|
|
|
help="Log level for the logger to use during running tests"),
|
|
|
|
Option('-n', "--no-network", action="store_true",
|
|
|
|
dest="no_network",
|
|
|
|
help="Do not run tests that require the network"),
|
|
|
|
Option("-t", "--log-traceback", action='store_true',
|
|
|
|
help="Enrich log-messages with compressed tracebacks"),
|
|
|
|
Option("--full-traceback", action='store_true',
|
|
|
|
help="Either to make the tracebacks full, not compressed (as by default)"),
|
|
|
|
|
|
|
|
])
|
|
|
|
|
|
|
|
return p
|
|
|
|
|
|
|
|
parser = get_opt_parser()
|
|
|
|
(opts, regexps) = parser.parse_args()
|
|
|
|
|
|
|
|
#
|
|
|
|
# Logging
|
|
|
|
#
|
|
|
|
logSys = logging.getLogger("fail2ban")
|
|
|
|
|
|
|
|
# Numerical level of verbosity corresponding to a log "level"
|
|
|
|
verbosity = {'heavydebug': 4,
|
|
|
|
'debug': 3,
|
|
|
|
'info': 2,
|
|
|
|
'warning': 1,
|
|
|
|
'error': 1,
|
|
|
|
'fatal': 0,
|
|
|
|
None: 1}[opts.log_level]
|
|
|
|
|
|
|
|
if opts.log_level is not None: # pragma: no cover
|
|
|
|
# so we had explicit settings
|
|
|
|
logSys.setLevel(getattr(logging, opts.log_level.upper()))
|
|
|
|
else: # pragma: no cover
|
|
|
|
# suppress the logging but it would leave unittests' progress dots
|
|
|
|
# ticking, unless like with '-l fatal' which would be silent
|
|
|
|
# unless error occurs
|
|
|
|
logSys.setLevel(getattr(logging, 'FATAL'))
|
|
|
|
|
|
|
|
# Add the default logging handler
|
|
|
|
stdout = logging.StreamHandler(sys.stdout)
|
|
|
|
|
|
|
|
fmt = ' %(message)s'
|
|
|
|
|
|
|
|
if opts.log_traceback:
|
|
|
|
Formatter = FormatterWithTraceBack
|
|
|
|
fmt = (opts.full_traceback and ' %(tb)s' or ' %(tbc)s') + fmt
|
|
|
|
else:
|
|
|
|
Formatter = logging.Formatter
|
|
|
|
|
|
|
|
# Custom log format for the verbose tests runs
|
|
|
|
if verbosity > 1: # pragma: no cover
|
|
|
|
stdout.setFormatter(Formatter(' %(asctime)-15s %(thread)s' + fmt))
|
|
|
|
else: # pragma: no cover
|
|
|
|
# just prefix with the space
|
|
|
|
stdout.setFormatter(Formatter(fmt))
|
|
|
|
logSys.addHandler(stdout)
|
|
|
|
|
|
|
|
#
|
|
|
|
# Let know the version
|
|
|
|
#
|
|
|
|
if not opts.log_level or opts.log_level != 'fatal': # pragma: no cover
|
|
|
|
print "Fail2ban %s test suite. Python %s. Please wait..." \
|
|
|
|
% (version, str(sys.version).replace('\n', ''))
|
|
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
# Gather the tests
|
|
|
|
#
|
|
|
|
if not len(regexps): # pragma: no cover
|
|
|
|
tests = unittest.TestSuite()
|
|
|
|
else: # pragma: no cover
|
|
|
|
import re
|
|
|
|
class FilteredTestSuite(unittest.TestSuite):
|
|
|
|
_regexps = [re.compile(r) for r in regexps]
|
|
|
|
def addTest(self, suite):
|
|
|
|
suite_str = str(suite)
|
|
|
|
for r in self._regexps:
|
|
|
|
if r.search(suite_str):
|
|
|
|
super(FilteredTestSuite, self).addTest(suite)
|
|
|
|
return
|
|
|
|
|
|
|
|
tests = FilteredTestSuite()
|
|
|
|
|
|
|
|
# Server
|
|
|
|
#tests.addTest(unittest.makeSuite(servertestcase.StartStop))
|
|
|
|
tests.addTest(unittest.makeSuite(servertestcase.Transmitter))
|
|
|
|
tests.addTest(unittest.makeSuite(servertestcase.JailTests))
|
|
|
|
tests.addTest(unittest.makeSuite(actiontestcase.ExecuteAction))
|
|
|
|
# FailManager
|
|
|
|
tests.addTest(unittest.makeSuite(failmanagertestcase.AddFailure))
|
|
|
|
# BanManager
|
|
|
|
tests.addTest(unittest.makeSuite(banmanagertestcase.AddFailure))
|
|
|
|
# ClientReaders
|
|
|
|
tests.addTest(unittest.makeSuite(clientreadertestcase.ConfigReaderTest))
|
|
|
|
tests.addTest(unittest.makeSuite(clientreadertestcase.JailReaderTest))
|
|
|
|
tests.addTest(unittest.makeSuite(clientreadertestcase.JailsReaderTest))
|
|
|
|
# CSocket and AsyncServer
|
|
|
|
tests.addTest(unittest.makeSuite(sockettestcase.Socket))
|
|
|
|
# Misc helpers
|
|
|
|
tests.addTest(unittest.makeSuite(misctestcase.HelpersTest))
|
|
|
|
tests.addTest(unittest.makeSuite(misctestcase.SetupTest))
|
|
|
|
tests.addTest(unittest.makeSuite(misctestcase.TestsUtilsTest))
|
|
|
|
tests.addTest(unittest.makeSuite(misctestcase.CustomDateFormatsTest))
|
|
|
|
|
|
|
|
# Filter
|
|
|
|
if not opts.no_network:
|
|
|
|
tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIP))
|
|
|
|
tests.addTest(unittest.makeSuite(filtertestcase.LogFile))
|
|
|
|
tests.addTest(unittest.makeSuite(filtertestcase.LogFileMonitor))
|
|
|
|
if not opts.no_network:
|
|
|
|
tests.addTest(unittest.makeSuite(filtertestcase.GetFailures))
|
|
|
|
tests.addTest(unittest.makeSuite(filtertestcase.DNSUtilsTests))
|
|
|
|
tests.addTest(unittest.makeSuite(filtertestcase.JailTests))
|
|
|
|
|
|
|
|
# DateDetector
|
|
|
|
tests.addTest(unittest.makeSuite(datedetectortestcase.DateDetectorTest))
|
|
|
|
if json:
|
|
|
|
# Filter Regex tests with sample logs
|
|
|
|
tests.addTest(unittest.makeSuite(samplestestcase.FilterSamplesRegex))
|
|
|
|
else:
|
|
|
|
print "I: Skipping filter samples testing. No simplejson/json module"
|
|
|
|
|
|
|
|
#
|
|
|
|
# Extensive use-tests of different available filters backends
|
|
|
|
#
|
|
|
|
|
|
|
|
from server.filterpoll import FilterPoll
|
|
|
|
filters = [FilterPoll] # always available
|
|
|
|
|
|
|
|
# Additional filters available only if external modules are available
|
|
|
|
# yoh: Since I do not know better way for parametric tests
|
|
|
|
# with good old unittest
|
|
|
|
try:
|
|
|
|
from server.filtergamin import FilterGamin
|
|
|
|
filters.append(FilterGamin)
|
|
|
|
except Exception, e: # pragma: no cover
|
|
|
|
print "I: Skipping gamin backend testing. Got exception '%s'" % e
|
|
|
|
|
|
|
|
try:
|
|
|
|
from server.filterpyinotify import FilterPyinotify
|
|
|
|
filters.append(FilterPyinotify)
|
|
|
|
except Exception, e: # pragma: no cover
|
|
|
|
print "I: Skipping pyinotify backend testing. Got exception '%s'" % e
|
|
|
|
|
|
|
|
for Filter_ in filters:
|
|
|
|
tests.addTest(unittest.makeSuite(
|
|
|
|
filtertestcase.get_monitor_failures_testcase(Filter_)))
|
|
|
|
|
|
|
|
# Server test for logging elements which break logging used to support
|
|
|
|
# testcases analysis
|
|
|
|
tests.addTest(unittest.makeSuite(servertestcase.TransmitterLogging))
|
|
|
|
|
|
|
|
#
|
|
|
|
# Run the tests
|
|
|
|
#
|
|
|
|
testRunner = unittest.TextTestRunner(verbosity=verbosity)
|
|
|
|
|
|
|
|
try:
|
|
|
|
# Set the time to a fixed, known value
|
|
|
|
# Sun Aug 14 12:00:00 CEST 2005
|
|
|
|
# yoh: we need to adjust TZ to match the one used by Cyril so all the timestamps match
|
|
|
|
old_TZ = os.environ.get('TZ', None)
|
|
|
|
os.environ['TZ'] = 'Europe/Zurich'
|
|
|
|
time.tzset()
|
|
|
|
MyTime.setTime(1124013600)
|
|
|
|
|
|
|
|
tests_results = testRunner.run(tests)
|
|
|
|
|
|
|
|
finally: # pragma: no cover
|
|
|
|
# Just for the sake of it reset the TZ
|
|
|
|
# yoh: move all this into setup/teardown methods within tests
|
|
|
|
os.environ.pop('TZ')
|
|
|
|
if old_TZ:
|
|
|
|
os.environ['TZ'] = old_TZ
|
|
|
|
time.tzset()
|
|
|
|
|
|
|
|
if not tests_results.wasSuccessful(): # pragma: no cover
|
|
|
|
sys.exit(1)
|