add test case to cover gh-2277, testOverflowedIPCache testing overflow of IP-cache multi-threaded (2 "parasite" threads flooding cache)

pull/2351/head
sebres 2019-02-18 16:56:43 +01:00
parent 5a54a44559
commit 14f997231d
1 changed files with 38 additions and 0 deletions

View File

@ -1753,6 +1753,44 @@ class DNSUtilsTests(unittest.TestCase):
# here the whole cache should be empty: # here the whole cache should be empty:
self.assertEqual(len(c), 0) self.assertEqual(len(c), 0)
def testOverflowedIPCache(self):
# test overflow of IP-cache multi-threaded (2 "parasite" threads flooding cache):
from threading import Thread
from random import shuffle
# save original cache and use smaller cache during the test here:
_org_cache = IPAddr.CACHE_OBJ
cache = IPAddr.CACHE_OBJ = Utils.Cache(maxCount=5, maxTime=60)
result = list()
count = 1 if unittest.F2B.fast else 50
try:
# tester procedure of worker:
def _TestCacheStr2IP(forw=True, result=[], random=False):
try:
c = count
while c:
c -= 1
s = xrange(0, 256, 1) if forw else xrange(255, -1, -1)
if random: shuffle([i for i in s])
for i in s:
IPAddr('192.0.2.'+str(i), IPAddr.FAM_IPv4)
IPAddr('2001:db8::'+str(i), IPAddr.FAM_IPv6)
result.append(None)
except Exception as e:
DefLogSys.debug(e, exc_info=True)
result.append(e)
# 2 workers flooding it forwards and backwards:
th1 = Thread(target=_TestCacheStr2IP, args=(True, result)); th1.start()
th2 = Thread(target=_TestCacheStr2IP, args=(False, result)); th2.start()
# and here we flooding it with random IPs too:
_TestCacheStr2IP(True, result, True)
finally:
# wait for end of threads and restore cache:
th1.join()
th2.join()
IPAddr.CACHE_OBJ = _org_cache
self.assertEqual(result, [None]*3) # no errors
self.assertTrue(len(cache) <= cache.maxCount)
class DNSUtilsNetworkTests(unittest.TestCase): class DNSUtilsNetworkTests(unittest.TestCase):