prevent sporadic assert errors on nested lists/dict differ at some level (mostly causes on python 3.4 because of different dict hashing);

introduces new test assertion routine `asesertSortedEqual` for comparison regardless elements order (regarding level/nestedOnly arguments).
pull/1867/head
sebres 2017-08-16 20:48:18 +02:00
parent 33874d6e53
commit 10c0d95401
7 changed files with 100 additions and 34 deletions

View File

@ -203,7 +203,7 @@ class StatusExtendedCymruInfo(unittest.TestCase):
ticket = BanTicket("8.0.0.0", 1167606000.0)
self.assertTrue(self.__banManager.addBanTicket(ticket))
cymru_info = self._getBanListExtendedCymruInfo()
self.assertDictEqual(dict((k, sorted(v)) for k, v in cymru_info.iteritems()),
{"asn": sorted(["nxdomain", "3356",]),
"country": sorted(["nxdomain", "US"]),
"rir": sorted(["nxdomain", "arin"])})
self.assertSortedEqual(cymru_info,
{"asn": ["nxdomain", "3356",],
"country": ["nxdomain", "US"],
"rir": ["nxdomain", "arin"]})

View File

@ -409,7 +409,7 @@ class FilterReaderTest(unittest.TestCase):
# Add sort as configreader uses dictionary and therefore order
# is unreliable
self.assertEqual(sorted(filterReader.convert()), sorted(output))
self.assertSortedEqual(filterReader.convert(), output)
filterReader = FilterReader("testcase01", "testcase01", {'maxlines': "5"},
share_config=TEST_FILES_DIR_SHARE_CFG, basedir=TEST_FILES_DIR)
@ -417,7 +417,7 @@ class FilterReaderTest(unittest.TestCase):
#filterReader.getOptions(["failregex", "ignoreregex"])
filterReader.getOptions(None)
output[-1][-1] = "5"
self.assertEqual(sorted(filterReader.convert()), sorted(output))
self.assertSortedEqual(filterReader.convert(), output)
def testFilterReaderSubstitionDefault(self):
output = [['set', 'jailname', 'addfailregex', 'to=sweet@example.com fromip=<IP>']]
@ -426,7 +426,7 @@ class FilterReaderTest(unittest.TestCase):
filterReader.read()
filterReader.getOptions(None)
c = filterReader.convert()
self.assertEqual(sorted(c), sorted(output))
self.assertSortedEqual(c, output)
def testFilterReaderSubstitionSet(self):
output = [['set', 'jailname', 'addfailregex', 'to=sour@example.com fromip=<IP>']]
@ -435,7 +435,7 @@ class FilterReaderTest(unittest.TestCase):
filterReader.read()
filterReader.getOptions(None)
c = filterReader.convert()
self.assertEqual(sorted(c), sorted(output))
self.assertSortedEqual(c, output)
def testFilterReaderSubstitionKnown(self):
output = [['set', 'jailname', 'addfailregex', 'to=test,sweet@example.com,test2,sweet@example.com fromip=<IP>']]
@ -446,7 +446,7 @@ class FilterReaderTest(unittest.TestCase):
filterReader.read()
filterReader.getOptions(None)
c = filterReader.convert()
self.assertEqual(sorted(c), sorted(output))
self.assertSortedEqual(c, output)
def testFilterReaderSubstitionFail(self):
# directly subst the same var :
@ -555,8 +555,8 @@ class JailsReaderTest(LogCaptureTestCase):
self.assertRaises(ValueError, jails.convert)
comm_commands = jails.convert(allow_no_files=True)
self.maxDiff = None
self.assertEqual(sorted(comm_commands),
sorted([['add', 'emptyaction', 'auto'],
self.assertSortedEqual(comm_commands,
[['add', 'emptyaction', 'auto'],
['add', 'test-known-interp', 'auto'],
['multi-set', 'test-known-interp', 'addfailregex', [
'failure test 1 (filter.d/test.conf) <HOST>',
@ -592,7 +592,7 @@ class JailsReaderTest(LogCaptureTestCase):
"Jail 'missingaction' skipped, because of wrong configuration: Unable to read action 'noactionfileforthisaction'"],
['config-error',
"Jail 'missingbitsjail' skipped, because of wrong configuration: Unable to read the filter 'catchallthebadies'"],
]))
])
self.assertLogged("Errors in jail 'missingbitsjail'.")
self.assertNotLogged("Skipping...")
self.assertLogged("No file(s) found for glob /weapons/of/mass/destruction")
@ -801,7 +801,7 @@ class JailsReaderTest(LogCaptureTestCase):
# and there is logging information left to be passed into the
# server
self.assertEqual(sorted(commands),
self.assertSortedEqual(commands,
[['set', 'dbfile',
'/var/lib/fail2ban/fail2ban.sqlite3'],
['set', 'dbpurgeage', '1d'],

View File

@ -336,9 +336,9 @@ class DatabaseTest(LogCaptureTestCase):
tickets = self.db.getBansMerged()
self.assertEqual(len(tickets), 2)
self.assertEqual(
sorted(list(set(ticket.getIP() for ticket in tickets))),
sorted([ticket.getIP() for ticket in tickets]))
self.assertSortedEqual(
list(set(ticket.getIP() for ticket in tickets)),
[ticket.getIP() for ticket in tickets])
tickets = self.db.getBansMerged(jail=jail2)
self.assertEqual(len(tickets), 1)

View File

@ -1286,7 +1286,7 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
self.waitForTicks(1)
self.waitFailTotal(6, 10)
self.assertTrue(Utils.wait_for(lambda: len(self.jail) == 2, 10))
self.assertEqual(sorted([self.jail.getFailTicket().getIP(), self.jail.getFailTicket().getIP()]),
self.assertSortedEqual([self.jail.getFailTicket().getIP(), self.jail.getFailTicket().getIP()],
["192.0.2.1", "192.0.2.2"])
cls = MonitorJournalFailures
@ -1334,7 +1334,7 @@ class GetFailures(LogCaptureTestCase):
self.assertEqual(self.filter.getLogPaths(), [GetFailures.FILENAME_01])
self.filter.addLogPath(GetFailures.FILENAME_02, tail=True)
self.assertEqual(self.filter.getLogCount(), 2)
self.assertEqual(sorted(self.filter.getLogPaths()), sorted([GetFailures.FILENAME_01, GetFailures.FILENAME_02]))
self.assertSortedEqual(self.filter.getLogPaths(), [GetFailures.FILENAME_01, GetFailures.FILENAME_02])
def testTail(self):
# There must be no containters registered, otherwise [-1] indexing would be wrong
@ -1547,7 +1547,7 @@ class GetFailures(LogCaptureTestCase):
_ticket_tuple(self.filter.failManager.toBan())[0:3])
except FailManagerEmpty:
break
self.assertEqual(sorted(foundList), sorted(output))
self.assertSortedEqual(foundList, output)
def testGetFailuresMultiLineIgnoreRegex(self):
output = [("192.0.43.10", 2, 1124013599.0)]
@ -1582,7 +1582,7 @@ class GetFailures(LogCaptureTestCase):
_ticket_tuple(self.filter.failManager.toBan())[0:3])
except FailManagerEmpty:
break
self.assertEqual(sorted(foundList), sorted(output))
self.assertSortedEqual(foundList, output)
class DNSUtilsTests(unittest.TestCase):
@ -1679,10 +1679,10 @@ class DNSUtilsNetworkTests(unittest.TestCase):
self.assertEqual(res, [])
res = DNSUtils.textToIp('www.example.com', 'warn')
# sort ipaddr, IPv4 is always smaller as IPv6
self.assertEqual(sorted(res), ['93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946'])
self.assertSortedEqual(res, ['93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946'])
res = DNSUtils.textToIp('www.example.com', 'yes')
# sort ipaddr, IPv4 is always smaller as IPv6
self.assertEqual(sorted(res), ['93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946'])
self.assertSortedEqual(res, ['93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946'])
def testTextToIp(self):
# Test hostnames
@ -1695,7 +1695,7 @@ class DNSUtilsNetworkTests(unittest.TestCase):
res = DNSUtils.textToIp(s, 'yes')
if s == 'www.example.com':
# sort ipaddr, IPv4 is always smaller as IPv6
self.assertEqual(sorted(res), ['93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946'])
self.assertSortedEqual(res, ['93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946'])
else:
self.assertEqual(res, [])
# pure ips:

View File

@ -285,6 +285,25 @@ class TestsUtilsTest(LogCaptureTestCase):
self.assertDictEqual({'A': [1, 2]}, {'A': [1, 2]})
self.assertRaises(AssertionError, self.assertDictEqual,
{'A': [1, 2]}, {'A': [2, 1]})
## assertSortedEqual:
self.assertSortedEqual(['A', 'B'], ['B', 'A'])
self.assertSortedEqual([['A', 'B']], [['B', 'A']], level=2)
self.assertSortedEqual([['A', 'B']], [['B', 'A']], nestedOnly=False)
self.assertRaises(AssertionError, lambda: self.assertSortedEqual(
[['A', 'B']], [['B', 'A']], level=1, nestedOnly=True))
self.assertSortedEqual({'A': ['A', 'B']}, {'A': ['B', 'A']}, nestedOnly=False)
self.assertRaises(AssertionError, lambda: self.assertSortedEqual(
{'A': ['A', 'B']}, {'A': ['B', 'A']}, level=1, nestedOnly=True))
self.assertSortedEqual(['Z', {'A': ['B', 'C'], 'B': ['E', 'F']}], [{'B': ['F', 'E'], 'A': ['C', 'B']}, 'Z'],
nestedOnly=False)
self.assertSortedEqual(['Z', {'A': ['B', 'C'], 'B': ['E', 'F']}], [{'B': ['F', 'E'], 'A': ['C', 'B']}, 'Z'],
level=-1)
self.assertRaises(AssertionError, lambda: self.assertSortedEqual(
['Z', {'A': ['B', 'C'], 'B': ['E', 'F']}], [{'B': ['F', 'E'], 'A': ['C', 'B']}, 'Z']))
self._testAssertionErrorRE(r"\['A'\] != \['C', 'B'\]",
self.assertSortedEqual, ['A'], ['C', 'B'])
self._testAssertionErrorRE(r"\['A', 'B'\] != \['B', 'C'\]",
self.assertSortedEqual, ['A', 'B'], ['C', 'B'])
def testFormatterWithTraceBack(self):
strout = StringIO()

View File

@ -124,14 +124,14 @@ class TransmitterBase(unittest.TestCase):
self.transm.proceed(["get", jail, cmd]), (0, []))
for n, value in enumerate(values):
ret = self.transm.proceed(["set", jail, cmdAdd, value])
self.assertEqual((ret[0], sorted(map(str, ret[1]))), (0, sorted(map(str, values[:n+1]))))
self.assertSortedEqual((ret[0], map(str, ret[1])), (0, map(str, values[:n+1])), level=2)
ret = self.transm.proceed(["get", jail, cmd])
self.assertEqual((ret[0], sorted(map(str, ret[1]))), (0, sorted(map(str, values[:n+1]))))
self.assertSortedEqual((ret[0], map(str, ret[1])), (0, map(str, values[:n+1])), level=2)
for n, value in enumerate(values):
ret = self.transm.proceed(["set", jail, cmdDel, value])
self.assertEqual((ret[0], sorted(map(str, ret[1]))), (0, sorted(map(str, values[n+1:]))))
self.assertSortedEqual((ret[0], map(str, ret[1])), (0, map(str, values[n+1:])), level=2)
ret = self.transm.proceed(["get", jail, cmd])
self.assertEqual((ret[0], sorted(map(str, ret[1]))), (0, sorted(map(str, values[n+1:]))))
self.assertSortedEqual((ret[0], map(str, ret[1])), (0, map(str, values[n+1:])), level=2)
def jailAddDelRegexTest(self, cmd, inValues, outValues, jail):
cmdAdd = "add" + cmd
@ -690,9 +690,9 @@ class Transmitter(TransmitterBase):
"be skipped" % (sys.version))
return
raise
self.assertEqual(
sorted(self.transm.proceed(["get", self.jailName,
"actionproperties", action])[1]),
self.assertSortedEqual(
self.transm.proceed(["get", self.jailName,
"actionproperties", action])[1],
['opt1', 'opt2'])
self.assertEqual(
self.transm.proceed(["get", self.jailName, "action", action,
@ -702,9 +702,9 @@ class Transmitter(TransmitterBase):
self.transm.proceed(["get", self.jailName, "action", action,
"opt2"]),
(0, None))
self.assertEqual(
sorted(self.transm.proceed(["get", self.jailName, "actionmethods",
action])[1]),
self.assertSortedEqual(
self.transm.proceed(["get", self.jailName, "actionmethods",
action])[1],
['ban', 'start', 'stop', 'testmethod', 'unban'])
self.assertEqual(
self.transm.proceed(["set", self.jailName, "action", action,

View File

@ -481,8 +481,8 @@ def gatherTests(regexps=None, opts=None):
# Forwards compatibility of unittest.TestCase for some early python versions
#
import difflib, pprint
if not hasattr(unittest.TestCase, 'assertDictEqual'):
import difflib, pprint
def assertDictEqual(self, d1, d2, msg=None):
self.assert_(isinstance(d1, dict), 'First argument is not a dictionary')
self.assert_(isinstance(d2, dict), 'Second argument is not a dictionary')
@ -495,6 +495,53 @@ if not hasattr(unittest.TestCase, 'assertDictEqual'):
self.fail(msg)
unittest.TestCase.assertDictEqual = assertDictEqual
def assertSortedEqual(self, a, b, level=1, nestedOnly=True, msg=None):
"""Compare complex elements (like dict, list or tuple) in sorted order until
level 0 not reached (initial level = -1 meant all levels),
or if nestedOnly set to True and some of the objects still contains nested lists or dicts.
"""
# used to recognize having element as nested dict, list or tuple:
def _is_nested(v):
if isinstance(v, dict):
return any(isinstance(v, (dict, list, tuple)) for v in v.itervalues())
return any(isinstance(v, (dict, list, tuple)) for v in v)
# level comparison routine:
def _assertSortedEqual(a, b, level, nestedOnly):
# first the lengths:
if len(a) != len(b):
raise ValueError('%r != %r' % (a, b))
# if not allow sorting of nested - just compare directly:
if not level and (nestedOnly and (not _is_nested(a) and not _is_nested(b))):
if a == b:
return
raise ValueError('%r != %r' % (a, b))
if isinstance(a, dict) and isinstance(b, dict): # compare dict's:
for k, v1 in a.iteritems():
v2 = b[k]
if isinstance(v1, (dict, list, tuple)) and isinstance(v2, (dict, list, tuple)):
_assertSortedEqual(v1, v2, level-1 if level != 0 else 0, nestedOnly)
elif v1 != v2:
raise ValueError('%r != %r' % (a, b))
else: # list, tuple, something iterable:
a = sorted(a, key=repr)
b = sorted(b, key=repr)
for v1, v2 in zip(a, b):
if isinstance(v1, (dict, list, tuple)) and isinstance(v2, (dict, list, tuple)):
_assertSortedEqual(v1, v2, level-1 if level != 0 else 0, nestedOnly)
elif v1 != v2:
raise ValueError('%r != %r' % (a, b))
# compare and produce assertion-error by exception:
try:
_assertSortedEqual(a, b, level, nestedOnly)
except Exception as e:
standardMsg = e.args[0] if isinstance(e, ValueError) else (str(e) + "\nwithin:")
diff = ('\n' + '\n'.join(difflib.ndiff(
pprint.pformat(a).splitlines(),
pprint.pformat(b).splitlines())))
msg = msg or (standardMsg + diff)
self.fail(msg)
unittest.TestCase.assertSortedEqual = assertSortedEqual
if not hasattr(unittest.TestCase, 'assertRaisesRegexp'):
def assertRaisesRegexp(self, exccls, regexp, fun, *args, **kwargs):
try: