From 9d56079756ca6aef901a69041786ab137d0b5ae3 Mon Sep 17 00:00:00 2001 From: sebres Date: Mon, 25 Jul 2016 19:12:33 +0200 Subject: [PATCH] several test cases rewritten using new assertIn, assertNotIn (better as own from unittest, because support generators beautifying, etc.) + new forward compatibility method assertRaisesRegexp; + methods assertIn, assertNotIn, assertRaisesRegexp are test covered now; + easy-fix for distributions compatible test cases (e.g. fedora default backend is 'systemd'), (closes gh-1353, closes gh-1490) cherry pick into 0.9 branch --- fail2ban/tests/action_d/test_badips.py | 2 +- fail2ban/tests/action_d/test_smtp.py | 8 +-- fail2ban/tests/actionstestcase.py | 6 +- fail2ban/tests/clientreadertestcase.py | 18 +++--- fail2ban/tests/databasetestcase.py | 10 +-- fail2ban/tests/filtertestcase.py | 4 +- fail2ban/tests/misctestcase.py | 87 ++++++++++++++++++++++++-- fail2ban/tests/servertestcase.py | 6 +- fail2ban/tests/utils.py | 50 ++++++++++++--- 9 files changed, 151 insertions(+), 40 deletions(-) diff --git a/fail2ban/tests/action_d/test_badips.py b/fail2ban/tests/action_d/test_badips.py index 97dabada..2b9090cc 100644 --- a/fail2ban/tests/action_d/test_badips.py +++ b/fail2ban/tests/action_d/test_badips.py @@ -51,7 +51,7 @@ if sys.version_info >= (2,7): def testCategory(self): categories = self.action.getCategories() - self.assertTrue("ssh" in categories) + self.assertIn("ssh", categories) self.assertTrue(len(categories) >= 10) self.assertRaises( diff --git a/fail2ban/tests/action_d/test_smtp.py b/fail2ban/tests/action_d/test_smtp.py index 1385fe82..5c8b1923 100644 --- a/fail2ban/tests/action_d/test_smtp.py +++ b/fail2ban/tests/action_d/test_smtp.py @@ -104,21 +104,21 @@ class SMTPActionTest(unittest.TestCase): self.assertEqual(self.smtpd.rcpttos, ["root"]) subject = "Subject: [Fail2Ban] %s: banned %s" % ( self.jail.name, aInfo['ip']) - self.assertTrue(subject in self.smtpd.data.replace("\n", "")) + self.assertIn(subject, self.smtpd.data.replace("\n", "")) self.assertTrue( "%i attempts" % aInfo['failures'] in self.smtpd.data) self.action.matches = "matches" self.action.ban(aInfo) - self.assertTrue(aInfo['matches'] in self.smtpd.data) + self.assertIn(aInfo['matches'], self.smtpd.data) self.action.matches = "ipjailmatches" self.action.ban(aInfo) - self.assertTrue(aInfo['ipjailmatches'] in self.smtpd.data) + self.assertIn(aInfo['ipjailmatches'], self.smtpd.data) self.action.matches = "ipmatches" self.action.ban(aInfo) - self.assertTrue(aInfo['ipmatches'] in self.smtpd.data) + self.assertIn(aInfo['ipmatches'], self.smtpd.data) def testOptions(self): self.action.start() diff --git a/fail2ban/tests/actionstestcase.py b/fail2ban/tests/actionstestcase.py index dff0d4c6..8969db36 100644 --- a/fail2ban/tests/actionstestcase.py +++ b/fail2ban/tests/actionstestcase.py @@ -66,12 +66,12 @@ class ExecuteActions(LogCaptureTestCase): def testActionsManipulation(self): self.__actions.add('test') self.assertTrue(self.__actions['test']) - self.assertTrue('test' in self.__actions) - self.assertFalse('nonexistant action' in self.__actions) + self.assertIn('test', self.__actions) + self.assertNotIn('nonexistant action', self.__actions) self.__actions.add('test1') del self.__actions['test'] del self.__actions['test1'] - self.assertFalse('test' in self.__actions) + self.assertNotIn('test', self.__actions) self.assertEqual(len(self.__actions), 0) self.__actions.setBanTime(127) diff --git a/fail2ban/tests/clientreadertestcase.py b/fail2ban/tests/clientreadertestcase.py index 521fda2e..39fd0daa 100644 --- a/fail2ban/tests/clientreadertestcase.py +++ b/fail2ban/tests/clientreadertestcase.py @@ -542,12 +542,12 @@ class JailsReaderTest(LogCaptureTestCase): self.assertTrue(actionReader.read()) actionReader.getOptions({}) # populate _opts if not actionName.endswith('-common'): - self.assertTrue('Definition' in actionReader.sections(), + self.assertIn('Definition', actionReader.sections(), msg="Action file %r is lacking [Definition] section" % actionConfig) # all must have some actionban defined self.assertTrue(actionReader._opts.get('actionban', '').strip(), msg="Action file %r is lacking actionban" % actionConfig) - self.assertTrue('Init' in actionReader.sections(), + self.assertIn('Init', actionReader.sections(), msg="Action file %r is lacking [Init] section" % actionConfig) def testReadStockJailConf(self): @@ -599,7 +599,7 @@ class JailsReaderTest(LogCaptureTestCase): self.assertTrue(len(actName)) self.assertTrue(isinstance(actOpt, dict)) if actName == 'iptables-multiport': - self.assertTrue('port' in actOpt) + self.assertIn('port', actOpt) actionReader = ActionReader(actName, jail, {}, share_config=CONFIG_DIR_SHARE_CFG, basedir=CONFIG_DIR) @@ -649,11 +649,13 @@ class JailsReaderTest(LogCaptureTestCase): # and we know even some of them by heart for j in ['sshd', 'recidive']: - # by default we have 'auto' backend ATM - self.assertTrue(['add', j, 'auto'] in comm_commands) + # by default we have 'auto' backend ATM, but some distributions can overwrite it, + # (e.g. fedora default is 'systemd') therefore let check it without backend... + self.assertIn(['add', j], + (cmd[:2] for cmd in comm_commands if len(cmd) == 3 and cmd[0] == 'add')) # and warn on useDNS - self.assertTrue(['set', j, 'usedns', 'warn'] in comm_commands) - self.assertTrue(['start', j] in comm_commands) + self.assertIn(['set', j, 'usedns', 'warn'], comm_commands) + self.assertIn(['start', j], comm_commands) # last commands should be the 'start' commands self.assertEqual(comm_commands[-1][0], 'start') @@ -672,7 +674,7 @@ class JailsReaderTest(LogCaptureTestCase): action_name = action.getName() if '' in str(commands): # Verify that it is among cInfo - self.assertTrue('blocktype' in action._initOpts) + self.assertIn('blocktype', action._initOpts) # Verify that we have a call to set it up blocktype_present = False target_command = [jail_name, 'action', action_name] diff --git a/fail2ban/tests/databasetestcase.py b/fail2ban/tests/databasetestcase.py index 5d83710f..d039c920 100644 --- a/fail2ban/tests/databasetestcase.py +++ b/fail2ban/tests/databasetestcase.py @@ -145,7 +145,7 @@ class DatabaseTest(LogCaptureTestCase): self.db.addLog(self.jail, self.fileContainer) - self.assertTrue(filename in self.db.getLogPaths(self.jail)) + self.assertIn(filename, self.db.getLogPaths(self.jail)) os.remove(filename) def testUpdateLog(self): @@ -376,17 +376,17 @@ class DatabaseTest(LogCaptureTestCase): # Delete jail (just disabled it): self.db.delJail(self.jail) jails = self.db.getJailNames() - self.assertTrue(len(jails) == 1 and self.jail.name in jails) + self.assertIn(len(jails) == 1 and self.jail.name, jails) jails = self.db.getJailNames(enabled=False) - self.assertTrue(len(jails) == 1 and self.jail.name in jails) + self.assertIn(len(jails) == 1 and self.jail.name, jails) jails = self.db.getJailNames(enabled=True) self.assertTrue(len(jails) == 0) # Add it again - should just enable it: self.db.addJail(self.jail) jails = self.db.getJailNames() - self.assertTrue(len(jails) == 1 and self.jail.name in jails) + self.assertIn(len(jails) == 1 and self.jail.name, jails) jails = self.db.getJailNames(enabled=True) - self.assertTrue(len(jails) == 1 and self.jail.name in jails) + self.assertIn(len(jails) == 1 and self.jail.name, jails) jails = self.db.getJailNames(enabled=False) self.assertTrue(len(jails) == 0) diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index dd3468ed..7907965b 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -1320,11 +1320,11 @@ class DNSUtilsTests(unittest.TestCase): for i in xrange(5): c.set(i, i) self.assertEqual([c.get(i) for i in xrange(5)], [i for i in xrange(5)]) - self.assertFalse(-1 in [c.get(i, -1) for i in xrange(5)]) + self.assertNotIn(-1, (c.get(i, -1) for i in xrange(5))) # add one - too many: c.set(10, i) # one element should be removed : - self.assertTrue(-1 in [c.get(i, -1) for i in xrange(5)]) + self.assertIn(-1, (c.get(i, -1) for i in xrange(5))) # test max size (not expired): for i in xrange(10): c.set(i, 1) diff --git a/fail2ban/tests/misctestcase.py b/fail2ban/tests/misctestcase.py index f72d9316..df347e77 100644 --- a/fail2ban/tests/misctestcase.py +++ b/fail2ban/tests/misctestcase.py @@ -23,6 +23,7 @@ __license__ = "GPL" import logging import os +import re import sys import unittest import tempfile @@ -32,6 +33,8 @@ import datetime from glob import glob from StringIO import StringIO +from utils import LogCaptureTestCase, logSys as DefLogSys + from ..helpers import formatExceptionInfo, mbasename, TraceBack, FormatterWithTraceBack, getLogger from ..helpers import splitwords from ..server.datetemplate import DatePatternRegex @@ -149,7 +152,7 @@ class SetupTest(unittest.TestCase): % (sys.executable, self.setup)) -class TestsUtilsTest(unittest.TestCase): +class TestsUtilsTest(LogCaptureTestCase): def testmbasename(self): self.assertEqual(mbasename("sample.py"), 'sample') @@ -184,12 +187,88 @@ class TestsUtilsTest(unittest.TestCase): if not ('fail2ban-testcases' in s): # we must be calling it from setup or nosetests but using at least # nose's core etc - self.assertTrue('>' in s, msg="no '>' in %r" % s) + self.assertIn('>', s) elif not ('coverage' in s): # There is only "fail2ban-testcases" in this case, no true traceback - self.assertFalse('>' in s, msg="'>' present in %r" % s) + self.assertNotIn('>', s) - self.assertTrue(':' in s, msg="no ':' in %r" % s) + self.assertIn(':', s) + + def _testAssertionErrorRE(self, regexp, fun, *args, **kwargs): + self.assertRaisesRegexp(AssertionError, regexp, fun, *args, **kwargs) + + def testExtendedAssertRaisesRE(self): + ## test _testAssertionErrorRE several fail cases: + def _key_err(msg): + raise KeyError(msg) + self.assertRaises(KeyError, + self._testAssertionErrorRE, r"^failed$", + _key_err, 'failed') + self.assertRaises(AssertionError, + self._testAssertionErrorRE, r"^failed$", + self.fail, '__failed__') + self._testAssertionErrorRE(r'failed.* does not match .*__failed__', + lambda: self._testAssertionErrorRE(r"^failed$", + self.fail, '__failed__') + ) + ## no exception in callable: + self.assertRaises(AssertionError, + self._testAssertionErrorRE, r"", int, 1) + self._testAssertionErrorRE(r'0 AssertionError not raised X.* does not match .*AssertionError not raised', + lambda: self._testAssertionErrorRE(r"^0 AssertionError not raised X$", + lambda: self._testAssertionErrorRE(r"", int, 1)) + ) + + def testExtendedAssertMethods(self): + ## assertIn, assertNotIn positive case: + self.assertIn('a', ['a', 'b', 'c', 'd']) + self.assertIn('a', ('a', 'b', 'c', 'd',)) + self.assertIn('a', 'cba') + self.assertIn('a', (c for c in 'cba' if c != 'b')) + self.assertNotIn('a', ['b', 'c', 'd']) + self.assertNotIn('a', ('b', 'c', 'd',)) + self.assertNotIn('a', 'cbd') + self.assertNotIn('a', (c.upper() for c in 'cba' if c != 'b')) + ## assertIn, assertNotIn negative case: + self._testAssertionErrorRE(r"'a' unexpectedly found in 'cba'", + self.assertNotIn, 'a', 'cba') + self._testAssertionErrorRE(r"1 unexpectedly found in \[0, 1, 2\]", + self.assertNotIn, 1, xrange(3)) + self._testAssertionErrorRE(r"'A' unexpectedly found in \['C', 'A'\]", + self.assertNotIn, 'A', (c.upper() for c in 'cba' if c != 'b')) + self._testAssertionErrorRE(r"'a' was not found in 'xyz'", + self.assertIn, 'a', 'xyz') + self._testAssertionErrorRE(r"5 was not found in \[0, 1, 2\]", + self.assertIn, 5, xrange(3)) + self._testAssertionErrorRE(r"'A' was not found in \['C', 'B'\]", + self.assertIn, 'A', (c.upper() for c in 'cba' if c != 'a')) + ## assertLogged, assertNotLogged positive case: + logSys = DefLogSys + self.pruneLog() + logSys.debug('test "xyz"') + self.assertLogged('test "xyz"') + self.assertLogged('test', 'xyz', all=True) + self.assertNotLogged('test', 'zyx', all=False) + self.assertNotLogged('test_zyx', 'zyx', all=True) + self.assertLogged('test', 'zyx', all=False) + self.pruneLog() + logSys.debug('xxxx "xxx"') + self.assertNotLogged('test "xyz"') + self.assertNotLogged('test', 'xyz', all=False) + self.assertNotLogged('test', 'xyz', 'zyx', all=True) + ## assertLogged, assertNotLogged negative case: + self.pruneLog() + logSys.debug('test "xyz"') + self._testAssertionErrorRE(r"All of the .* were found present in the log", + self.assertNotLogged, 'test "xyz"') + self._testAssertionErrorRE(r"was found in the log", + self.assertNotLogged, 'test', 'xyz', all=True) + self._testAssertionErrorRE(r"was not found in the log", + self.assertLogged, 'test', 'zyx', all=True) + self._testAssertionErrorRE(r"None among .* was found in the log", + self.assertLogged, 'test_zyx', 'zyx', all=False) + self._testAssertionErrorRE(r"All of the .* were found present in the log", + self.assertNotLogged, 'test', 'xyz', all=False) def testFormatterWithTraceBack(self): strout = StringIO() diff --git a/fail2ban/tests/servertestcase.py b/fail2ban/tests/servertestcase.py index ab606dbe..6c0f46cf 100644 --- a/fail2ban/tests/servertestcase.py +++ b/fail2ban/tests/servertestcase.py @@ -252,7 +252,7 @@ class Transmitter(TransmitterBase): 3) ) self.assertEqual( self.transm.proceed(["stop", self.jailName]), (0, None)) - self.assertTrue(self.jailName not in self.server._Server__jails) + self.assertNotIn(self.jailName, self.server._Server__jails) def testStartStopAllJail(self): self.server.addJail("TestJail2", FAST_BACKEND) @@ -269,8 +269,8 @@ class Transmitter(TransmitterBase): 3) ) self.assertEqual(self.transm.proceed(["stop", "all"]), (0, None)) self.assertTrue( Utils.wait_for( lambda: not len(self.server._Server__jails), 3) ) - self.assertTrue(self.jailName not in self.server._Server__jails) - self.assertTrue("TestJail2" not in self.server._Server__jails) + self.assertNotIn(self.jailName, self.server._Server__jails) + self.assertNotIn("TestJail2", self.server._Server__jails) def testJailIdle(self): self.assertEqual( diff --git a/fail2ban/tests/utils.py b/fail2ban/tests/utils.py index a06fbef0..2838fa05 100644 --- a/fail2ban/tests/utils.py +++ b/fail2ban/tests/utils.py @@ -22,6 +22,7 @@ __author__ = "Yaroslav Halchenko" __copyright__ = "Copyright (c) 2013 Yaroslav Halchenko" __license__ = "GPL" +import itertools import logging import optparse import os @@ -323,16 +324,45 @@ def gatherTests(regexps=None, opts=None): return tests -# forwards compatibility of unittest.TestCase for some early python versions -if not hasattr(unittest.TestCase, 'assertIn'): - def __assertIn(self, a, b, msg=None): - if a not in b: # pragma: no cover - self.fail(msg or "%r was not found in %r" % (a, b)) - unittest.TestCase.assertIn = __assertIn - def __assertNotIn(self, a, b, msg=None): - if a in b: # pragma: no cover - self.fail(msg or "%r was found in %r" % (a, b)) - unittest.TestCase.assertNotIn = __assertNotIn +# +# Forwards compatibility of unittest.TestCase for some early python versions +# + +if not hasattr(unittest.TestCase, 'assertRaisesRegexp'): + def assertRaisesRegexp(self, exccls, regexp, fun, *args, **kwargs): + try: + fun(*args, **kwargs) + except exccls as e: + if re.search(regexp, e.message) is None: + self.fail('\"%s\" does not match \"%s\"' % (regexp, e.message)) + else: + self.fail('%s not raised' % getattr(exccls, '__name__')) + unittest.TestCase.assertRaisesRegexp = assertRaisesRegexp + +# always custom following methods, because we use atm better version of both (support generators) +if True: ## if not hasattr(unittest.TestCase, 'assertIn'): + def assertIn(self, a, b, msg=None): + bb = b + wrap = False + if msg is None and hasattr(b, '__iter__') and not isinstance(b, basestring): + b, bb = itertools.tee(b) + wrap = True + if a not in b: + if wrap: bb = list(bb) + msg = msg or "%r was not found in %r" % (a, bb) + self.fail(msg) + unittest.TestCase.assertIn = assertIn + def assertNotIn(self, a, b, msg=None): + bb = b + wrap = False + if msg is None and hasattr(b, '__iter__') and not isinstance(b, basestring): + b, bb = itertools.tee(b) + wrap = True + if a in b: + if wrap: bb = list(bb) + msg = msg or "%r unexpectedly found in %r" % (a, bb) + self.fail(msg) + unittest.TestCase.assertNotIn = assertNotIn class LogCaptureTestCase(unittest.TestCase):