mirror of https://github.com/fail2ban/fail2ban
TST: Move test gathering to function is test utils
parent
4cc3a81cc1
commit
9e684abad7
|
@ -33,16 +33,8 @@ import unittest, logging, sys, time, os
|
|||
if os.path.exists("fail2ban/__init__.py"):
|
||||
sys.path.insert(0, ".")
|
||||
from fail2ban.version import version
|
||||
from fail2ban.tests import banmanagertestcase
|
||||
from fail2ban.tests import clientreadertestcase
|
||||
from fail2ban.tests import failmanagertestcase
|
||||
from fail2ban.tests import filtertestcase
|
||||
from fail2ban.tests import servertestcase
|
||||
from fail2ban.tests import datedetectortestcase
|
||||
from fail2ban.tests import actiontestcase
|
||||
from fail2ban.tests import sockettestcase
|
||||
|
||||
from fail2ban.tests.utils import FormatterWithTraceBack
|
||||
from fail2ban.tests.utils import FormatterWithTraceBack, gatherTests
|
||||
from fail2ban.server.mytime import MyTime
|
||||
|
||||
from optparse import OptionParser, Option
|
||||
|
@ -122,84 +114,7 @@ if not opts.log_level or opts.log_level != 'fatal': # pragma: no cover
|
|||
print "Fail2ban %s test suite. Python %s. Please wait..." \
|
||||
% (version, str(sys.version).replace('\n', ''))
|
||||
|
||||
|
||||
#
|
||||
# Gather the tests
|
||||
#
|
||||
if not len(regexps): # pragma: no cover
|
||||
tests = unittest.TestSuite()
|
||||
else: # pragma: no cover
|
||||
import re
|
||||
class FilteredTestSuite(unittest.TestSuite):
|
||||
_regexps = [re.compile(r) for r in regexps]
|
||||
def addTest(self, suite):
|
||||
suite_str = str(suite)
|
||||
for r in self._regexps:
|
||||
if r.search(suite_str):
|
||||
super(FilteredTestSuite, self).addTest(suite)
|
||||
return
|
||||
|
||||
tests = FilteredTestSuite()
|
||||
|
||||
# Server
|
||||
#tests.addTest(unittest.makeSuite(servertestcase.StartStop))
|
||||
tests.addTest(unittest.makeSuite(servertestcase.Transmitter))
|
||||
tests.addTest(unittest.makeSuite(actiontestcase.ExecuteAction))
|
||||
# FailManager
|
||||
tests.addTest(unittest.makeSuite(failmanagertestcase.AddFailure))
|
||||
# BanManager
|
||||
tests.addTest(unittest.makeSuite(banmanagertestcase.AddFailure))
|
||||
# ClientReaders
|
||||
tests.addTest(unittest.makeSuite(clientreadertestcase.ConfigReaderTest))
|
||||
tests.addTest(unittest.makeSuite(clientreadertestcase.JailReaderTest))
|
||||
tests.addTest(unittest.makeSuite(clientreadertestcase.FilterReaderTest))
|
||||
tests.addTest(unittest.makeSuite(clientreadertestcase.JailsReaderTest))
|
||||
# CSocket and AsyncServer
|
||||
tests.addTest(unittest.makeSuite(sockettestcase.Socket))
|
||||
|
||||
# Filter
|
||||
if not opts.no_network:
|
||||
tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIP))
|
||||
tests.addTest(unittest.makeSuite(filtertestcase.LogFile))
|
||||
tests.addTest(unittest.makeSuite(filtertestcase.LogFileMonitor))
|
||||
if not opts.no_network:
|
||||
tests.addTest(unittest.makeSuite(filtertestcase.GetFailures))
|
||||
tests.addTest(unittest.makeSuite(filtertestcase.DNSUtilsTests))
|
||||
tests.addTest(unittest.makeSuite(filtertestcase.JailTests))
|
||||
|
||||
# DateDetector
|
||||
tests.addTest(unittest.makeSuite(datedetectortestcase.DateDetectorTest))
|
||||
|
||||
#
|
||||
# Extensive use-tests of different available filters backends
|
||||
#
|
||||
|
||||
from fail2ban.server.filterpoll import FilterPoll
|
||||
filters = [FilterPoll] # always available
|
||||
|
||||
# Additional filters available only if external modules are available
|
||||
# yoh: Since I do not know better way for parametric tests
|
||||
# with good old unittest
|
||||
try:
|
||||
from fail2ban.server.filtergamin import FilterGamin
|
||||
filters.append(FilterGamin)
|
||||
except Exception, e: # pragma: no cover
|
||||
print "I: Skipping gamin backend testing. Got exception '%s'" % e
|
||||
|
||||
try:
|
||||
from fail2ban.server.filterpyinotify import FilterPyinotify
|
||||
filters.append(FilterPyinotify)
|
||||
except Exception, e: # pragma: no cover
|
||||
print "I: Skipping pyinotify backend testing. Got exception '%s'" % e
|
||||
|
||||
for Filter_ in filters:
|
||||
tests.addTest(unittest.makeSuite(
|
||||
filtertestcase.get_monitor_failures_testcase(Filter_)))
|
||||
|
||||
# Server test for logging elements which break logging used to support
|
||||
# testcases analysis
|
||||
tests.addTest(unittest.makeSuite(servertestcase.TransmitterLogging))
|
||||
|
||||
tests = gatherTests(regexps, opts.no_network)
|
||||
#
|
||||
# Run the tests
|
||||
#
|
||||
|
|
|
@ -22,11 +22,13 @@ __author__ = "Yaroslav Halchenko"
|
|||
__copyright__ = "Copyright (c) 2013 Yaroslav Halchenko"
|
||||
__license__ = "GPL"
|
||||
|
||||
import logging, os, re, traceback, time
|
||||
import logging, os, re, traceback, time, unittest
|
||||
from os.path import basename, dirname
|
||||
|
||||
from fail2ban.server.mytime import MyTime
|
||||
|
||||
logSys = logging.getLogger(__name__)
|
||||
|
||||
#
|
||||
# Following "traceback" functions are adopted from PyMVPA distributed
|
||||
# under MIT/Expat and copyright by PyMVPA developers (i.e. me and
|
||||
|
@ -116,3 +118,89 @@ def tearDownMyTime():
|
|||
if old_TZ:
|
||||
os.environ['TZ'] = old_TZ
|
||||
time.tzset()
|
||||
|
||||
from fail2ban.tests import banmanagertestcase
|
||||
from fail2ban.tests import clientreadertestcase
|
||||
from fail2ban.tests import failmanagertestcase
|
||||
from fail2ban.tests import filtertestcase
|
||||
from fail2ban.tests import servertestcase
|
||||
from fail2ban.tests import datedetectortestcase
|
||||
from fail2ban.tests import actiontestcase
|
||||
from fail2ban.tests import sockettestcase
|
||||
|
||||
def gatherTests(regexps=None, no_network=False):
|
||||
if not regexps: # pragma: no cover
|
||||
tests = unittest.TestSuite()
|
||||
else: # pragma: no cover
|
||||
import re
|
||||
class FilteredTestSuite(unittest.TestSuite):
|
||||
_regexps = [re.compile(r) for r in regexps]
|
||||
def addTest(self, suite):
|
||||
suite_str = str(suite)
|
||||
for r in self._regexps:
|
||||
if r.search(suite_str):
|
||||
super(FilteredTestSuite, self).addTest(suite)
|
||||
return
|
||||
|
||||
tests = FilteredTestSuite()
|
||||
|
||||
# Server
|
||||
#tests.addTest(unittest.makeSuite(servertestcase.StartStop))
|
||||
tests.addTest(unittest.makeSuite(servertestcase.Transmitter))
|
||||
tests.addTest(unittest.makeSuite(actiontestcase.ExecuteAction))
|
||||
# FailManager
|
||||
tests.addTest(unittest.makeSuite(failmanagertestcase.AddFailure))
|
||||
# BanManager
|
||||
tests.addTest(unittest.makeSuite(banmanagertestcase.AddFailure))
|
||||
# ClientReaders
|
||||
tests.addTest(unittest.makeSuite(clientreadertestcase.ConfigReaderTest))
|
||||
tests.addTest(unittest.makeSuite(clientreadertestcase.JailReaderTest))
|
||||
tests.addTest(unittest.makeSuite(clientreadertestcase.FilterReaderTest))
|
||||
tests.addTest(unittest.makeSuite(clientreadertestcase.JailsReaderTest))
|
||||
# CSocket and AsyncServer
|
||||
tests.addTest(unittest.makeSuite(sockettestcase.Socket))
|
||||
|
||||
# Filter
|
||||
if not no_network:
|
||||
tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIP))
|
||||
tests.addTest(unittest.makeSuite(filtertestcase.LogFile))
|
||||
tests.addTest(unittest.makeSuite(filtertestcase.LogFileMonitor))
|
||||
if not no_network:
|
||||
tests.addTest(unittest.makeSuite(filtertestcase.GetFailures))
|
||||
tests.addTest(unittest.makeSuite(filtertestcase.DNSUtilsTests))
|
||||
tests.addTest(unittest.makeSuite(filtertestcase.JailTests))
|
||||
|
||||
# DateDetector
|
||||
tests.addTest(unittest.makeSuite(datedetectortestcase.DateDetectorTest))
|
||||
|
||||
#
|
||||
# Extensive use-tests of different available filters backends
|
||||
#
|
||||
|
||||
from fail2ban.server.filterpoll import FilterPoll
|
||||
filters = [FilterPoll] # always available
|
||||
|
||||
# Additional filters available only if external modules are available
|
||||
# yoh: Since I do not know better way for parametric tests
|
||||
# with good old unittest
|
||||
try:
|
||||
from fail2ban.server.filtergamin import FilterGamin
|
||||
filters.append(FilterGamin)
|
||||
except Exception, e: # pragma: no cover
|
||||
logSys.warning("Skipping gamin backend testing. Got exception '%s'" % e)
|
||||
|
||||
try:
|
||||
from fail2ban.server.filterpyinotify import FilterPyinotify
|
||||
filters.append(FilterPyinotify)
|
||||
except Exception, e: # pragma: no cover
|
||||
logSys.warning("I: Skipping pyinotify backend testing. Got exception '%s'" % e)
|
||||
|
||||
for Filter_ in filters:
|
||||
tests.addTest(unittest.makeSuite(
|
||||
filtertestcase.get_monitor_failures_testcase(Filter_)))
|
||||
|
||||
# Server test for logging elements which break logging used to support
|
||||
# testcases analysis
|
||||
tests.addTest(unittest.makeSuite(servertestcase.TransmitterLogging))
|
||||
|
||||
return tests
|
||||
|
|
Loading…
Reference in New Issue