several test cases rewritten using new assertIn, assertNotIn (better as own from unittest, because support generators beautifying, etc.)

+ new forward compatibility method assertRaisesRegexp;
+ methods assertIn, assertNotIn, assertRaisesRegexp are test covered now;
+ easy-fix for distributions compatible test cases (e.g. fedora default backend is 'systemd'), (closes gh-1353, closes gh-1490)

cherry pick into 0.9 branch
pull/1492/head
sebres 2016-07-25 19:12:33 +02:00
parent 5797ea0ae2
commit 9d56079756
9 changed files with 151 additions and 40 deletions

View File

@ -51,7 +51,7 @@ if sys.version_info >= (2,7):
def testCategory(self):
categories = self.action.getCategories()
self.assertTrue("ssh" in categories)
self.assertIn("ssh", categories)
self.assertTrue(len(categories) >= 10)
self.assertRaises(

View File

@ -104,21 +104,21 @@ class SMTPActionTest(unittest.TestCase):
self.assertEqual(self.smtpd.rcpttos, ["root"])
subject = "Subject: [Fail2Ban] %s: banned %s" % (
self.jail.name, aInfo['ip'])
self.assertTrue(subject in self.smtpd.data.replace("\n", ""))
self.assertIn(subject, self.smtpd.data.replace("\n", ""))
self.assertTrue(
"%i attempts" % aInfo['failures'] in self.smtpd.data)
self.action.matches = "matches"
self.action.ban(aInfo)
self.assertTrue(aInfo['matches'] in self.smtpd.data)
self.assertIn(aInfo['matches'], self.smtpd.data)
self.action.matches = "ipjailmatches"
self.action.ban(aInfo)
self.assertTrue(aInfo['ipjailmatches'] in self.smtpd.data)
self.assertIn(aInfo['ipjailmatches'], self.smtpd.data)
self.action.matches = "ipmatches"
self.action.ban(aInfo)
self.assertTrue(aInfo['ipmatches'] in self.smtpd.data)
self.assertIn(aInfo['ipmatches'], self.smtpd.data)
def testOptions(self):
self.action.start()

View File

@ -66,12 +66,12 @@ class ExecuteActions(LogCaptureTestCase):
def testActionsManipulation(self):
self.__actions.add('test')
self.assertTrue(self.__actions['test'])
self.assertTrue('test' in self.__actions)
self.assertFalse('nonexistant action' in self.__actions)
self.assertIn('test', self.__actions)
self.assertNotIn('nonexistant action', self.__actions)
self.__actions.add('test1')
del self.__actions['test']
del self.__actions['test1']
self.assertFalse('test' in self.__actions)
self.assertNotIn('test', self.__actions)
self.assertEqual(len(self.__actions), 0)
self.__actions.setBanTime(127)

View File

@ -542,12 +542,12 @@ class JailsReaderTest(LogCaptureTestCase):
self.assertTrue(actionReader.read())
actionReader.getOptions({}) # populate _opts
if not actionName.endswith('-common'):
self.assertTrue('Definition' in actionReader.sections(),
self.assertIn('Definition', actionReader.sections(),
msg="Action file %r is lacking [Definition] section" % actionConfig)
# all must have some actionban defined
self.assertTrue(actionReader._opts.get('actionban', '').strip(),
msg="Action file %r is lacking actionban" % actionConfig)
self.assertTrue('Init' in actionReader.sections(),
self.assertIn('Init', actionReader.sections(),
msg="Action file %r is lacking [Init] section" % actionConfig)
def testReadStockJailConf(self):
@ -599,7 +599,7 @@ class JailsReaderTest(LogCaptureTestCase):
self.assertTrue(len(actName))
self.assertTrue(isinstance(actOpt, dict))
if actName == 'iptables-multiport':
self.assertTrue('port' in actOpt)
self.assertIn('port', actOpt)
actionReader = ActionReader(actName, jail, {},
share_config=CONFIG_DIR_SHARE_CFG, basedir=CONFIG_DIR)
@ -649,11 +649,13 @@ class JailsReaderTest(LogCaptureTestCase):
# and we know even some of them by heart
for j in ['sshd', 'recidive']:
# by default we have 'auto' backend ATM
self.assertTrue(['add', j, 'auto'] in comm_commands)
# by default we have 'auto' backend ATM, but some distributions can overwrite it,
# (e.g. fedora default is 'systemd') therefore let check it without backend...
self.assertIn(['add', j],
(cmd[:2] for cmd in comm_commands if len(cmd) == 3 and cmd[0] == 'add'))
# and warn on useDNS
self.assertTrue(['set', j, 'usedns', 'warn'] in comm_commands)
self.assertTrue(['start', j] in comm_commands)
self.assertIn(['set', j, 'usedns', 'warn'], comm_commands)
self.assertIn(['start', j], comm_commands)
# last commands should be the 'start' commands
self.assertEqual(comm_commands[-1][0], 'start')
@ -672,7 +674,7 @@ class JailsReaderTest(LogCaptureTestCase):
action_name = action.getName()
if '<blocktype>' in str(commands):
# Verify that it is among cInfo
self.assertTrue('blocktype' in action._initOpts)
self.assertIn('blocktype', action._initOpts)
# Verify that we have a call to set it up
blocktype_present = False
target_command = [jail_name, 'action', action_name]

View File

@ -145,7 +145,7 @@ class DatabaseTest(LogCaptureTestCase):
self.db.addLog(self.jail, self.fileContainer)
self.assertTrue(filename in self.db.getLogPaths(self.jail))
self.assertIn(filename, self.db.getLogPaths(self.jail))
os.remove(filename)
def testUpdateLog(self):
@ -376,17 +376,17 @@ class DatabaseTest(LogCaptureTestCase):
# Delete jail (just disabled it):
self.db.delJail(self.jail)
jails = self.db.getJailNames()
self.assertTrue(len(jails) == 1 and self.jail.name in jails)
self.assertIn(len(jails) == 1 and self.jail.name, jails)
jails = self.db.getJailNames(enabled=False)
self.assertTrue(len(jails) == 1 and self.jail.name in jails)
self.assertIn(len(jails) == 1 and self.jail.name, jails)
jails = self.db.getJailNames(enabled=True)
self.assertTrue(len(jails) == 0)
# Add it again - should just enable it:
self.db.addJail(self.jail)
jails = self.db.getJailNames()
self.assertTrue(len(jails) == 1 and self.jail.name in jails)
self.assertIn(len(jails) == 1 and self.jail.name, jails)
jails = self.db.getJailNames(enabled=True)
self.assertTrue(len(jails) == 1 and self.jail.name in jails)
self.assertIn(len(jails) == 1 and self.jail.name, jails)
jails = self.db.getJailNames(enabled=False)
self.assertTrue(len(jails) == 0)

View File

@ -1320,11 +1320,11 @@ class DNSUtilsTests(unittest.TestCase):
for i in xrange(5):
c.set(i, i)
self.assertEqual([c.get(i) for i in xrange(5)], [i for i in xrange(5)])
self.assertFalse(-1 in [c.get(i, -1) for i in xrange(5)])
self.assertNotIn(-1, (c.get(i, -1) for i in xrange(5)))
# add one - too many:
c.set(10, i)
# one element should be removed :
self.assertTrue(-1 in [c.get(i, -1) for i in xrange(5)])
self.assertIn(-1, (c.get(i, -1) for i in xrange(5)))
# test max size (not expired):
for i in xrange(10):
c.set(i, 1)

View File

@ -23,6 +23,7 @@ __license__ = "GPL"
import logging
import os
import re
import sys
import unittest
import tempfile
@ -32,6 +33,8 @@ import datetime
from glob import glob
from StringIO import StringIO
from utils import LogCaptureTestCase, logSys as DefLogSys
from ..helpers import formatExceptionInfo, mbasename, TraceBack, FormatterWithTraceBack, getLogger
from ..helpers import splitwords
from ..server.datetemplate import DatePatternRegex
@ -149,7 +152,7 @@ class SetupTest(unittest.TestCase):
% (sys.executable, self.setup))
class TestsUtilsTest(unittest.TestCase):
class TestsUtilsTest(LogCaptureTestCase):
def testmbasename(self):
self.assertEqual(mbasename("sample.py"), 'sample')
@ -184,12 +187,88 @@ class TestsUtilsTest(unittest.TestCase):
if not ('fail2ban-testcases' in s):
# we must be calling it from setup or nosetests but using at least
# nose's core etc
self.assertTrue('>' in s, msg="no '>' in %r" % s)
self.assertIn('>', s)
elif not ('coverage' in s):
# There is only "fail2ban-testcases" in this case, no true traceback
self.assertFalse('>' in s, msg="'>' present in %r" % s)
self.assertNotIn('>', s)
self.assertTrue(':' in s, msg="no ':' in %r" % s)
self.assertIn(':', s)
def _testAssertionErrorRE(self, regexp, fun, *args, **kwargs):
self.assertRaisesRegexp(AssertionError, regexp, fun, *args, **kwargs)
def testExtendedAssertRaisesRE(self):
## test _testAssertionErrorRE several fail cases:
def _key_err(msg):
raise KeyError(msg)
self.assertRaises(KeyError,
self._testAssertionErrorRE, r"^failed$",
_key_err, 'failed')
self.assertRaises(AssertionError,
self._testAssertionErrorRE, r"^failed$",
self.fail, '__failed__')
self._testAssertionErrorRE(r'failed.* does not match .*__failed__',
lambda: self._testAssertionErrorRE(r"^failed$",
self.fail, '__failed__')
)
## no exception in callable:
self.assertRaises(AssertionError,
self._testAssertionErrorRE, r"", int, 1)
self._testAssertionErrorRE(r'0 AssertionError not raised X.* does not match .*AssertionError not raised',
lambda: self._testAssertionErrorRE(r"^0 AssertionError not raised X$",
lambda: self._testAssertionErrorRE(r"", int, 1))
)
def testExtendedAssertMethods(self):
## assertIn, assertNotIn positive case:
self.assertIn('a', ['a', 'b', 'c', 'd'])
self.assertIn('a', ('a', 'b', 'c', 'd',))
self.assertIn('a', 'cba')
self.assertIn('a', (c for c in 'cba' if c != 'b'))
self.assertNotIn('a', ['b', 'c', 'd'])
self.assertNotIn('a', ('b', 'c', 'd',))
self.assertNotIn('a', 'cbd')
self.assertNotIn('a', (c.upper() for c in 'cba' if c != 'b'))
## assertIn, assertNotIn negative case:
self._testAssertionErrorRE(r"'a' unexpectedly found in 'cba'",
self.assertNotIn, 'a', 'cba')
self._testAssertionErrorRE(r"1 unexpectedly found in \[0, 1, 2\]",
self.assertNotIn, 1, xrange(3))
self._testAssertionErrorRE(r"'A' unexpectedly found in \['C', 'A'\]",
self.assertNotIn, 'A', (c.upper() for c in 'cba' if c != 'b'))
self._testAssertionErrorRE(r"'a' was not found in 'xyz'",
self.assertIn, 'a', 'xyz')
self._testAssertionErrorRE(r"5 was not found in \[0, 1, 2\]",
self.assertIn, 5, xrange(3))
self._testAssertionErrorRE(r"'A' was not found in \['C', 'B'\]",
self.assertIn, 'A', (c.upper() for c in 'cba' if c != 'a'))
## assertLogged, assertNotLogged positive case:
logSys = DefLogSys
self.pruneLog()
logSys.debug('test "xyz"')
self.assertLogged('test "xyz"')
self.assertLogged('test', 'xyz', all=True)
self.assertNotLogged('test', 'zyx', all=False)
self.assertNotLogged('test_zyx', 'zyx', all=True)
self.assertLogged('test', 'zyx', all=False)
self.pruneLog()
logSys.debug('xxxx "xxx"')
self.assertNotLogged('test "xyz"')
self.assertNotLogged('test', 'xyz', all=False)
self.assertNotLogged('test', 'xyz', 'zyx', all=True)
## assertLogged, assertNotLogged negative case:
self.pruneLog()
logSys.debug('test "xyz"')
self._testAssertionErrorRE(r"All of the .* were found present in the log",
self.assertNotLogged, 'test "xyz"')
self._testAssertionErrorRE(r"was found in the log",
self.assertNotLogged, 'test', 'xyz', all=True)
self._testAssertionErrorRE(r"was not found in the log",
self.assertLogged, 'test', 'zyx', all=True)
self._testAssertionErrorRE(r"None among .* was found in the log",
self.assertLogged, 'test_zyx', 'zyx', all=False)
self._testAssertionErrorRE(r"All of the .* were found present in the log",
self.assertNotLogged, 'test', 'xyz', all=False)
def testFormatterWithTraceBack(self):
strout = StringIO()

View File

@ -252,7 +252,7 @@ class Transmitter(TransmitterBase):
3) )
self.assertEqual(
self.transm.proceed(["stop", self.jailName]), (0, None))
self.assertTrue(self.jailName not in self.server._Server__jails)
self.assertNotIn(self.jailName, self.server._Server__jails)
def testStartStopAllJail(self):
self.server.addJail("TestJail2", FAST_BACKEND)
@ -269,8 +269,8 @@ class Transmitter(TransmitterBase):
3) )
self.assertEqual(self.transm.proceed(["stop", "all"]), (0, None))
self.assertTrue( Utils.wait_for( lambda: not len(self.server._Server__jails), 3) )
self.assertTrue(self.jailName not in self.server._Server__jails)
self.assertTrue("TestJail2" not in self.server._Server__jails)
self.assertNotIn(self.jailName, self.server._Server__jails)
self.assertNotIn("TestJail2", self.server._Server__jails)
def testJailIdle(self):
self.assertEqual(

View File

@ -22,6 +22,7 @@ __author__ = "Yaroslav Halchenko"
__copyright__ = "Copyright (c) 2013 Yaroslav Halchenko"
__license__ = "GPL"
import itertools
import logging
import optparse
import os
@ -323,16 +324,45 @@ def gatherTests(regexps=None, opts=None):
return tests
# forwards compatibility of unittest.TestCase for some early python versions
if not hasattr(unittest.TestCase, 'assertIn'):
def __assertIn(self, a, b, msg=None):
if a not in b: # pragma: no cover
self.fail(msg or "%r was not found in %r" % (a, b))
unittest.TestCase.assertIn = __assertIn
def __assertNotIn(self, a, b, msg=None):
if a in b: # pragma: no cover
self.fail(msg or "%r was found in %r" % (a, b))
unittest.TestCase.assertNotIn = __assertNotIn
#
# Forwards compatibility of unittest.TestCase for some early python versions
#
if not hasattr(unittest.TestCase, 'assertRaisesRegexp'):
def assertRaisesRegexp(self, exccls, regexp, fun, *args, **kwargs):
try:
fun(*args, **kwargs)
except exccls as e:
if re.search(regexp, e.message) is None:
self.fail('\"%s\" does not match \"%s\"' % (regexp, e.message))
else:
self.fail('%s not raised' % getattr(exccls, '__name__'))
unittest.TestCase.assertRaisesRegexp = assertRaisesRegexp
# always custom following methods, because we use atm better version of both (support generators)
if True: ## if not hasattr(unittest.TestCase, 'assertIn'):
def assertIn(self, a, b, msg=None):
bb = b
wrap = False
if msg is None and hasattr(b, '__iter__') and not isinstance(b, basestring):
b, bb = itertools.tee(b)
wrap = True
if a not in b:
if wrap: bb = list(bb)
msg = msg or "%r was not found in %r" % (a, bb)
self.fail(msg)
unittest.TestCase.assertIn = assertIn
def assertNotIn(self, a, b, msg=None):
bb = b
wrap = False
if msg is None and hasattr(b, '__iter__') and not isinstance(b, basestring):
b, bb = itertools.tee(b)
wrap = True
if a in b:
if wrap: bb = list(bb)
msg = msg or "%r unexpectedly found in %r" % (a, bb)
self.fail(msg)
unittest.TestCase.assertNotIn = assertNotIn
class LogCaptureTestCase(unittest.TestCase):