introduces new decorator/conditional helper in order to skip some STOCK-related test-cases (if running outside of stock-config environment).

pull/2025/head
sebres 2018-01-22 14:40:04 +01:00
parent 9af9ec25f5
commit 7a757645bb
6 changed files with 1042 additions and 1013 deletions

View File

@ -52,6 +52,7 @@ class SMTPActionTest(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
unittest.F2B.SkipIfCfgMissing(action='smtp.py')
super(SMTPActionTest, self).setUp()
self.jail = DummyJail()
pythonModule = os.path.join(CONFIG_DIR, "action.d", "smtp.py")

View File

@ -45,8 +45,6 @@ TEST_FILES_DIR_SHARE_CFG = {}
from .utils import CONFIG_DIR
CONFIG_DIR_SHARE_CFG = unittest.F2B.share_config
STOCK = os.path.exists(os.path.join('config', 'fail2ban.conf'))
IMPERFECT_CONFIG = os.path.join(os.path.dirname(__file__), 'config')
IMPERFECT_CONFIG_SHARE_CFG = {}
@ -246,15 +244,15 @@ class JailReaderTest(LogCaptureTestCase):
self.assertTrue(jail.isEnabled())
self.assertLogged("Invalid filter definition 'flt[test'")
if STOCK:
def testStockSSHJail(self):
jail = JailReader('sshd', basedir=CONFIG_DIR, share_config=CONFIG_DIR_SHARE_CFG) # we are running tests from root project dir atm
self.assertTrue(jail.read())
self.assertTrue(jail.getOptions())
self.assertFalse(jail.isEnabled())
self.assertEqual(jail.getName(), 'sshd')
jail.setName('ssh-funky-blocker')
self.assertEqual(jail.getName(), 'ssh-funky-blocker')
def testStockSSHJail(self):
unittest.F2B.SkipIfCfgMissing(stock=True)
jail = JailReader('sshd', basedir=CONFIG_DIR, share_config=CONFIG_DIR_SHARE_CFG) # we are running tests from root project dir atm
self.assertTrue(jail.read())
self.assertTrue(jail.getOptions())
self.assertFalse(jail.isEnabled())
self.assertEqual(jail.getName(), 'sshd')
jail.setName('ssh-funky-blocker')
self.assertEqual(jail.getName(), 'ssh-funky-blocker')
def testSplitOption(self):
# Simple example
@ -307,6 +305,7 @@ class JailReaderTest(LogCaptureTestCase):
self.assertEqual(expected2, result)
def testVersionAgent(self):
unittest.F2B.SkipIfCfgMissing(stock=True)
jail = JailReader('blocklisttest', force_enable=True, basedir=CONFIG_DIR)
# emulate jail.read(), because such jail not exists:
ConfigReader.read(jail, "jail");
@ -597,222 +596,226 @@ class JailsReaderTest(LogCaptureTestCase):
self.assertNotLogged("Skipping...")
self.assertLogged("No file(s) found for glob /weapons/of/mass/destruction")
if STOCK:
def testReadStockActionConf(self):
for actionConfig in glob.glob(os.path.join(CONFIG_DIR, 'action.d', '*.conf')):
actionName = os.path.basename(actionConfig).replace('.conf', '')
actionReader = ActionReader(actionName, "TEST", {}, basedir=CONFIG_DIR)
self.assertTrue(actionReader.read())
try:
actionReader.getOptions({}) # populate _opts
except Exception as e: # pragma: no cover
self.fail("action %r\n%s: %s" % (actionName, type(e).__name__, e))
if not actionName.endswith('-common'):
self.assertIn('Definition', actionReader.sections(),
msg="Action file %r is lacking [Definition] section" % actionConfig)
# all must have some actionban defined
self.assertTrue(actionReader._opts.get('actionban', '').strip(),
msg="Action file %r is lacking actionban" % actionConfig)
# test name of jail is set in options (also if not supplied within parameters):
opts = actionReader.getCombined(
ignore=CommandAction._escapedTags | set(('timeout', 'bantime')))
self.assertEqual(opts.get('name'), 'TEST',
msg="Action file %r does not contains jail-name 'f2b-TEST'" % actionConfig)
# and the name is substituted (test several actions surely contains name-interpolation):
if actionName in ('pf', 'iptables-allports', 'iptables-multiport'):
#print('****', actionName, opts.get('actionstart', ''))
self.assertIn('f2b-TEST', opts.get('actionstart', ''),
msg="Action file %r: interpolation of actionstart does not contains jail-name 'f2b-TEST'" % actionConfig)
def testReadStockActionConf(self):
unittest.F2B.SkipIfCfgMissing(stock=True)
for actionConfig in glob.glob(os.path.join(CONFIG_DIR, 'action.d', '*.conf')):
actionName = os.path.basename(actionConfig).replace('.conf', '')
actionReader = ActionReader(actionName, "TEST", {}, basedir=CONFIG_DIR)
self.assertTrue(actionReader.read())
try:
actionReader.getOptions({}) # populate _opts
except Exception as e: # pragma: no cover
self.fail("action %r\n%s: %s" % (actionName, type(e).__name__, e))
if not actionName.endswith('-common'):
self.assertIn('Definition', actionReader.sections(),
msg="Action file %r is lacking [Definition] section" % actionConfig)
# all must have some actionban defined
self.assertTrue(actionReader._opts.get('actionban', '').strip(),
msg="Action file %r is lacking actionban" % actionConfig)
# test name of jail is set in options (also if not supplied within parameters):
opts = actionReader.getCombined(
ignore=CommandAction._escapedTags | set(('timeout', 'bantime')))
self.assertEqual(opts.get('name'), 'TEST',
msg="Action file %r does not contains jail-name 'f2b-TEST'" % actionConfig)
# and the name is substituted (test several actions surely contains name-interpolation):
if actionName in ('pf', 'iptables-allports', 'iptables-multiport'):
#print('****', actionName, opts.get('actionstart', ''))
self.assertIn('f2b-TEST', opts.get('actionstart', ''),
msg="Action file %r: interpolation of actionstart does not contains jail-name 'f2b-TEST'" % actionConfig)
def testReadStockJailConf(self):
jails = JailsReader(basedir=CONFIG_DIR, share_config=CONFIG_DIR_SHARE_CFG) # we are running tests from root project dir atm
self.assertTrue(jails.read()) # opens fine
self.assertTrue(jails.getOptions()) # reads fine
comm_commands = jails.convert()
# by default None of the jails is enabled and we get no
# commands to communicate to the server
self.assertEqual(comm_commands, [])
def testReadStockJailConf(self):
unittest.F2B.SkipIfCfgMissing(stock=True)
jails = JailsReader(basedir=CONFIG_DIR, share_config=CONFIG_DIR_SHARE_CFG) # we are running tests from root project dir atm
self.assertTrue(jails.read()) # opens fine
self.assertTrue(jails.getOptions()) # reads fine
comm_commands = jails.convert()
# by default None of the jails is enabled and we get no
# commands to communicate to the server
self.assertEqual(comm_commands, [])
# TODO: make sure this is handled well
## We should not "read" some bogus jail
#old_comm_commands = comm_commands[:] # make a copy
#self.assertRaises(ValueError, jails.getOptions, "BOGUS")
#self.printLog()
#self.assertLogged("No section: 'BOGUS'")
## and there should be no side-effects
#self.assertEqual(jails.convert(), old_comm_commands)
# TODO: make sure this is handled well
## We should not "read" some bogus jail
#old_comm_commands = comm_commands[:] # make a copy
#self.assertRaises(ValueError, jails.getOptions, "BOGUS")
#self.printLog()
#self.assertLogged("No section: 'BOGUS'")
## and there should be no side-effects
#self.assertEqual(jails.convert(), old_comm_commands)
allFilters = set()
allFilters = set()
# All jails must have filter and action set
# TODO: evolve into a parametric test
for jail in jails.sections():
if jail == 'INCLUDES':
continue
filterName = jails.get(jail, 'filter')
filterName, filterOpt = extractOptions(filterName)
allFilters.add(filterName)
self.assertTrue(len(filterName))
# moreover we must have a file for it
# and it must be readable as a Filter
filterReader = FilterReader(filterName, jail, filterOpt,
# All jails must have filter and action set
# TODO: evolve into a parametric test
for jail in jails.sections():
if jail == 'INCLUDES':
continue
filterName = jails.get(jail, 'filter')
filterName, filterOpt = extractOptions(filterName)
allFilters.add(filterName)
self.assertTrue(len(filterName))
# moreover we must have a file for it
# and it must be readable as a Filter
filterReader = FilterReader(filterName, jail, filterOpt,
share_config=CONFIG_DIR_SHARE_CFG, basedir=CONFIG_DIR)
self.assertTrue(filterReader.read(),"Failed to read filter:" + filterName) # opens fine
filterReader.getOptions({}) # reads fine
# test if filter has failregex set
self.assertTrue(filterReader._opts.get('failregex', '').strip())
actions = jails.get(jail, 'action')
self.assertTrue(len(actions.strip()))
# somewhat duplicating here what is done in JailsReader if
# the jail is enabled
for act in actions.split('\n'):
actName, actOpt = extractOptions(act)
self.assertTrue(len(actName))
self.assertTrue(isinstance(actOpt, dict))
if actName == 'iptables-multiport':
self.assertIn('port', actOpt)
actionReader = ActionReader(actName, jail, {},
share_config=CONFIG_DIR_SHARE_CFG, basedir=CONFIG_DIR)
self.assertTrue(filterReader.read(),"Failed to read filter:" + filterName) # opens fine
filterReader.getOptions({}) # reads fine
self.assertTrue(actionReader.read())
actionReader.getOptions({}) # populate _opts
cmds = actionReader.convert()
self.assertTrue(len(cmds))
# test if filter has failregex set
self.assertTrue(filterReader._opts.get('failregex', '').strip())
# all must have some actionban
self.assertTrue(actionReader._opts.get('actionban', '').strip())
actions = jails.get(jail, 'action')
self.assertTrue(len(actions.strip()))
# Verify that all filters found under config/ have a jail
def testReadStockJailFilterComplete(self):
unittest.F2B.SkipIfCfgMissing(stock=True)
jails = JailsReader(basedir=CONFIG_DIR, force_enable=True, share_config=CONFIG_DIR_SHARE_CFG)
self.assertTrue(jails.read()) # opens fine
self.assertTrue(jails.getOptions()) # reads fine
# grab all filter names
filters = set(os.path.splitext(os.path.split(a)[1])[0]
for a in glob.glob(os.path.join('config', 'filter.d', '*.conf'))
if not (a.endswith('common.conf') or a.endswith('-aggressive.conf')))
# get filters of all jails (filter names without options inside filter[...])
filters_jail = set(
extractOptions(jail.options['filter'])[0] for jail in jails.jails
)
self.maxDiff = None
self.assertTrue(filters.issubset(filters_jail),
"More filters exists than are referenced in stock jail.conf %r" % filters.difference(filters_jail))
self.assertTrue(filters_jail.issubset(filters),
"Stock jail.conf references non-existent filters %r" % filters_jail.difference(filters))
# somewhat duplicating here what is done in JailsReader if
# the jail is enabled
for act in actions.split('\n'):
actName, actOpt = extractOptions(act)
self.assertTrue(len(actName))
self.assertTrue(isinstance(actOpt, dict))
if actName == 'iptables-multiport':
self.assertIn('port', actOpt)
def testReadStockJailConfForceEnabled(self):
unittest.F2B.SkipIfCfgMissing(stock=True)
# more of a smoke test to make sure that no obvious surprises
# on users' systems when enabling shipped jails
jails = JailsReader(basedir=CONFIG_DIR, force_enable=True, share_config=CONFIG_DIR_SHARE_CFG) # we are running tests from root project dir atm
self.assertTrue(jails.read()) # opens fine
self.assertTrue(jails.getOptions()) # reads fine
comm_commands = jails.convert(allow_no_files=True)
actionReader = ActionReader(actName, jail, {},
share_config=CONFIG_DIR_SHARE_CFG, basedir=CONFIG_DIR)
self.assertTrue(actionReader.read())
actionReader.getOptions({}) # populate _opts
cmds = actionReader.convert()
self.assertTrue(len(cmds))
# by default we have lots of jails ;)
self.assertTrue(len(comm_commands))
# all must have some actionban
self.assertTrue(actionReader._opts.get('actionban', '').strip())
# some common sanity checks for commands
for command in comm_commands:
if len(command) >= 3 and [command[0], command[2]] == ['set', 'bantime']:
self.assertTrue(MyTime.str2seconds(command[3]) > 0)
# Verify that all filters found under config/ have a jail
def testReadStockJailFilterComplete(self):
jails = JailsReader(basedir=CONFIG_DIR, force_enable=True, share_config=CONFIG_DIR_SHARE_CFG)
self.assertTrue(jails.read()) # opens fine
self.assertTrue(jails.getOptions()) # reads fine
# grab all filter names
filters = set(os.path.splitext(os.path.split(a)[1])[0]
for a in glob.glob(os.path.join('config', 'filter.d', '*.conf'))
if not (a.endswith('common.conf') or a.endswith('-aggressive.conf')))
# get filters of all jails (filter names without options inside filter[...])
filters_jail = set(
extractOptions(jail.options['filter'])[0] for jail in jails.jails
)
self.maxDiff = None
self.assertTrue(filters.issubset(filters_jail),
"More filters exists than are referenced in stock jail.conf %r" % filters.difference(filters_jail))
self.assertTrue(filters_jail.issubset(filters),
"Stock jail.conf references non-existent filters %r" % filters_jail.difference(filters))
# and we know even some of them by heart
for j in ['sshd', 'recidive']:
# by default we have 'auto' backend ATM, but some distributions can overwrite it,
# (e.g. fedora default is 'systemd') therefore let check it without backend...
self.assertIn(['add', j],
(cmd[:2] for cmd in comm_commands if len(cmd) == 3 and cmd[0] == 'add'))
# and warn on useDNS
self.assertIn(['set', j, 'usedns', 'warn'], comm_commands)
self.assertIn(['start', j], comm_commands)
def testReadStockJailConfForceEnabled(self):
# more of a smoke test to make sure that no obvious surprises
# on users' systems when enabling shipped jails
jails = JailsReader(basedir=CONFIG_DIR, force_enable=True, share_config=CONFIG_DIR_SHARE_CFG) # we are running tests from root project dir atm
self.assertTrue(jails.read()) # opens fine
self.assertTrue(jails.getOptions()) # reads fine
comm_commands = jails.convert(allow_no_files=True)
# last commands should be the 'start' commands
self.assertEqual(comm_commands[-1][0], 'start')
# by default we have lots of jails ;)
self.assertTrue(len(comm_commands))
for j in jails._JailsReader__jails:
actions = j._JailReader__actions
jail_name = j.getName()
# make sure that all of the jails have actions assigned,
# otherwise it makes little to no sense
self.assertTrue(len(actions),
msg="No actions found for jail %s" % jail_name)
# some common sanity checks for commands
for command in comm_commands:
if len(command) >= 3 and [command[0], command[2]] == ['set', 'bantime']:
self.assertTrue(MyTime.str2seconds(command[3]) > 0)
# Test for presence of blocktype (in relation to gh-232)
for action in actions:
commands = action.convert()
action_name = action.getName()
if '<blocktype>' in str(commands):
# Verify that it is among cInfo
self.assertIn('blocktype', action._initOpts)
# Verify that we have a call to set it up
blocktype_present = False
target_command = [jail_name, 'action', action_name]
for command in commands:
if (len(command) > 4 and command[0] == 'multi-set' and
command[1:4] == target_command):
blocktype_present = ('blocktype' in [cmd[0] for cmd in command[4]])
elif (len(command) > 5 and command[0] == 'set' and
command[1:4] == target_command and command[4] == 'blocktype'): # pragma: no cover - because of multi-set
blocktype_present = True
if blocktype_present:
break
self.assertTrue(
blocktype_present,
msg="Found no %s command among %s"
% (target_command, str(commands)) )
# and we know even some of them by heart
for j in ['sshd', 'recidive']:
# by default we have 'auto' backend ATM, but some distributions can overwrite it,
# (e.g. fedora default is 'systemd') therefore let check it without backend...
self.assertIn(['add', j],
(cmd[:2] for cmd in comm_commands if len(cmd) == 3 and cmd[0] == 'add'))
# and warn on useDNS
self.assertIn(['set', j, 'usedns', 'warn'], comm_commands)
self.assertIn(['start', j], comm_commands)
def testStockConfigurator(self):
unittest.F2B.SkipIfCfgMissing(stock=True)
configurator = Configurator()
configurator.setBaseDir(CONFIG_DIR)
self.assertEqual(configurator.getBaseDir(), CONFIG_DIR)
# last commands should be the 'start' commands
self.assertEqual(comm_commands[-1][0], 'start')
configurator.readEarly()
opts = configurator.getEarlyOptions()
# our current default settings
self.assertEqual(opts['socket'], '/var/run/fail2ban/fail2ban.sock')
self.assertEqual(opts['pidfile'], '/var/run/fail2ban/fail2ban.pid')
for j in jails._JailsReader__jails:
actions = j._JailReader__actions
jail_name = j.getName()
# make sure that all of the jails have actions assigned,
# otherwise it makes little to no sense
self.assertTrue(len(actions),
msg="No actions found for jail %s" % jail_name)
configurator.readAll()
configurator.getOptions()
configurator.convertToProtocol()
commands = configurator.getConfigStream()
# Test for presence of blocktype (in relation to gh-232)
for action in actions:
commands = action.convert()
action_name = action.getName()
if '<blocktype>' in str(commands):
# Verify that it is among cInfo
self.assertIn('blocktype', action._initOpts)
# Verify that we have a call to set it up
blocktype_present = False
target_command = [jail_name, 'action', action_name]
for command in commands:
if (len(command) > 4 and command[0] == 'multi-set' and
command[1:4] == target_command):
blocktype_present = ('blocktype' in [cmd[0] for cmd in command[4]])
elif (len(command) > 5 and command[0] == 'set' and
command[1:4] == target_command and command[4] == 'blocktype'): # pragma: no cover - because of multi-set
blocktype_present = True
if blocktype_present:
break
self.assertTrue(
blocktype_present,
msg="Found no %s command among %s"
% (target_command, str(commands)) )
# verify that dbfile comes before dbpurgeage
def find_set(option):
for i, e in enumerate(commands):
if e[0] == 'set' and e[1] == option:
return i
raise ValueError("Did not find command 'set %s' among commands %s"
% (option, commands))
def testStockConfigurator(self):
configurator = Configurator()
configurator.setBaseDir(CONFIG_DIR)
self.assertEqual(configurator.getBaseDir(), CONFIG_DIR)
# Set up of logging should come first
self.assertEqual(find_set('syslogsocket'), 0)
self.assertEqual(find_set('loglevel'), 1)
self.assertEqual(find_set('logtarget'), 2)
# then dbfile should be before dbpurgeage
self.assertTrue(find_set('dbpurgeage') > find_set('dbfile'))
configurator.readEarly()
opts = configurator.getEarlyOptions()
# our current default settings
self.assertEqual(opts['socket'], '/var/run/fail2ban/fail2ban.sock')
self.assertEqual(opts['pidfile'], '/var/run/fail2ban/fail2ban.pid')
# and there is logging information left to be passed into the
# server
self.assertSortedEqual(commands,
[['set', 'dbfile',
'/var/lib/fail2ban/fail2ban.sqlite3'],
['set', 'dbpurgeage', '1d'],
['set', 'loglevel', "INFO"],
['set', 'logtarget', '/var/log/fail2ban.log'],
['set', 'syslogsocket', 'auto']])
configurator.readAll()
configurator.getOptions()
configurator.convertToProtocol()
commands = configurator.getConfigStream()
# verify that dbfile comes before dbpurgeage
def find_set(option):
for i, e in enumerate(commands):
if e[0] == 'set' and e[1] == option:
return i
raise ValueError("Did not find command 'set %s' among commands %s"
% (option, commands))
# Set up of logging should come first
self.assertEqual(find_set('syslogsocket'), 0)
self.assertEqual(find_set('loglevel'), 1)
self.assertEqual(find_set('logtarget'), 2)
# then dbfile should be before dbpurgeage
self.assertTrue(find_set('dbpurgeage') > find_set('dbfile'))
# and there is logging information left to be passed into the
# server
self.assertSortedEqual(commands,
[['set', 'dbfile',
'/var/lib/fail2ban/fail2ban.sqlite3'],
['set', 'dbpurgeage', '1d'],
['set', 'loglevel', "INFO"],
['set', 'logtarget', '/var/log/fail2ban.log'],
['set', 'syslogsocket', 'auto']])
# and if we force change configurator's fail2ban's baseDir
# there should be an error message (test visually ;) --
# otherwise just a code smoke test)
configurator._Configurator__jails.setBaseDir('/tmp')
self.assertEqual(configurator._Configurator__jails.getBaseDir(), '/tmp')
self.assertEqual(configurator.getBaseDir(), CONFIG_DIR)
# and if we force change configurator's fail2ban's baseDir
# there should be an error message (test visually ;) --
# otherwise just a code smoke test)
configurator._Configurator__jails.setBaseDir('/tmp')
self.assertEqual(configurator._Configurator__jails.getBaseDir(), '/tmp')
self.assertEqual(configurator.getBaseDir(), CONFIG_DIR)
@with_tmpdir
def testMultipleSameAction(self, basedir):

View File

@ -43,16 +43,14 @@ from .. import protocol
from ..server import server
from ..server.mytime import MyTime
from ..server.utils import Utils
from .utils import LogCaptureTestCase, logSys as DefLogSys, with_tmpdir, shutil, logging
from .utils import CONFIG_DIR as STOCK_CONF_DIR
from .utils import LogCaptureTestCase, logSys as DefLogSys, with_tmpdir, shutil, logging, \
STOCK, CONFIG_DIR as STOCK_CONF_DIR
from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
STOCK = exists(pjoin(STOCK_CONF_DIR, 'fail2ban.conf'))
CLIENT = "fail2ban-client"
SERVER = "fail2ban-server"
BIN = dirname(Fail2banServer.getServerPath())
@ -1171,6 +1169,7 @@ class Fail2banServerTest(Fail2banClientServerBase):
"Jail 'test-jail1' started", all=True)
# test action.d/nginx-block-map.conf --
@unittest.F2B.skip_if_cfg_missing(action="nginx-block-map")
@with_foreground_server_thread(startextra={
# create log-file (avoid "not found" errors):
'create_before_start': ('%(tmp)s/blck-failures.log',),

View File

@ -24,7 +24,6 @@ __license__ = "GPL"
from __builtin__ import open as fopen
import unittest
import getpass
import os
import sys
import time, datetime
@ -43,14 +42,12 @@ from ..server.failmanager import FailManagerEmpty
from ..server.ipdns import DNSUtils, IPAddr
from ..server.mytime import MyTime
from ..server.utils import Utils, uni_decode
from .utils import setUpMyTime, tearDownMyTime, mtimesleep, with_tmpdir, LogCaptureTestCase
from .utils import setUpMyTime, tearDownMyTime, mtimesleep, with_tmpdir, LogCaptureTestCase, \
CONFIG_DIR as STOCK_CONF_DIR
from .dummyjail import DummyJail
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
STOCK_CONF_DIR = "config"
STOCK = os.path.exists(os.path.join(STOCK_CONF_DIR, 'fail2ban.conf'))
# yoh: per Steven Hiscocks's insight while troubleshooting
# https://github.com/fail2ban/fail2ban/issues/103#issuecomment-15542836
@ -445,8 +442,7 @@ class IgnoreIPDNS(LogCaptureTestCase):
self.assertFalse(self.filter.inIgnoreIPList("128.178.222.70"))
def testIgnoreCmdApacheFakegooglebot(self):
if not STOCK: # pragma: no cover
raise unittest.SkipTest('Skip test because of no STOCK config')
unittest.F2B.SkipIfCfgMissing(stock=True)
cmd = os.path.join(STOCK_CONF_DIR, "filter.d/ignorecommands/apache-fakegooglebot")
## below test direct as python module:
mod = Utils.load_python_module(cmd)

File diff suppressed because it is too large Load Diff

View File

@ -59,6 +59,9 @@ if not CONFIG_DIR:
else:
CONFIG_DIR = '/etc/fail2ban'
# Indicates that we've stock config:
STOCK = os.path.exists(os.path.join(CONFIG_DIR, 'fail2ban.conf'))
# During the test cases (or setup) use fail2ban modules from main directory:
os.putenv('PYTHONPATH', os.path.dirname(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)))))
@ -187,6 +190,31 @@ class F2B(DefaultTestOptions):
pass
def SkipIfNoNetwork(self):
pass
def SkipIfCfgMissing(self, **kwargs):
"""Helper to check action/filter config is available
"""
if not STOCK: # pragma: no cover
if kwargs.get('stock'):
raise unittest.SkipTest('Skip test because of missing stock-config files')
for t in ('action', 'filter'):
v = kwargs.get(t)
if v is None: continue
if os.path.splitext(v)[1] == '': v += '.conf'
if not os.path.exists(os.path.join(CONFIG_DIR, t+'.d', v)):
raise unittest.SkipTest('Skip test because of missing %s-config for %r' % (t, v))
def skip_if_cfg_missing(self, **decargs):
"""Helper decorator to check action/filter config is available
"""
def _deco_wrapper(f):
@wraps(f)
def wrapper(self, *args, **kwargs):
unittest.F2B.SkipIfCfgMissing(**decargs)
return f(self, *args, **kwargs)
return wrapper
return _deco_wrapper
def maxWaitTime(self,wtime):
if self.fast:
wtime = float(wtime) / 10