mirror of https://github.com/fail2ban/fail2ban
TST: Ensure files are closed in tests to remove ResourceWarnings
parent
12df12f282
commit
c9b1b88bfc
|
@ -53,10 +53,12 @@ class ConfigReaderTest(unittest.TestCase):
|
|||
d_ = os.path.join(self.d, d)
|
||||
if not os.path.exists(d_):
|
||||
os.makedirs(d_)
|
||||
open("%s/%s" % (self.d, fname), "w").write("""
|
||||
f = open("%s/%s" % (self.d, fname), "w")
|
||||
f.write("""
|
||||
[section]
|
||||
option = %s
|
||||
""" % value)
|
||||
f.close()
|
||||
|
||||
def _remove(self, fname):
|
||||
os.unlink("%s/%s" % (self.d, fname))
|
||||
|
|
|
@ -123,7 +123,7 @@ def _assert_correct_last_attempt(utest, filter_, output, count=None):
|
|||
|
||||
_assert_equal_entries(utest, found, output, count)
|
||||
|
||||
def _copy_lines_between_files(fin, fout, n=None, skip=0, mode='a', terminal_line=""):
|
||||
def _copy_lines_between_files(in_, fout, n=None, skip=0, mode='a', terminal_line=""):
|
||||
"""Copy lines from one file to another (which might be already open)
|
||||
|
||||
Returns open fout
|
||||
|
@ -132,8 +132,10 @@ def _copy_lines_between_files(fin, fout, n=None, skip=0, mode='a', terminal_line
|
|||
# on old Python st_mtime is int, so we should give at least 1 sec so
|
||||
# polling filter could detect the change
|
||||
time.sleep(1)
|
||||
if isinstance(fin, str): # pragma: no branch - only used with str in test cases
|
||||
fin = open(fin, 'r')
|
||||
if isinstance(in_, str): # pragma: no branch - only used with str in test cases
|
||||
fin = open(in_, 'r')
|
||||
else:
|
||||
fin = in_
|
||||
# Skip
|
||||
for i in xrange(skip):
|
||||
_ = fin.readline()
|
||||
|
@ -151,6 +153,9 @@ def _copy_lines_between_files(fin, fout, n=None, skip=0, mode='a', terminal_line
|
|||
fout = open(fout, mode)
|
||||
fout.write('\n'.join(lines))
|
||||
fout.flush()
|
||||
if isinstance(in_, str): # pragma: no branch - only used with str in test cases
|
||||
# Opened earlier, therefore must close it
|
||||
fin.close()
|
||||
# to give other threads possibly some time to crunch
|
||||
time.sleep(0.1)
|
||||
return fout
|
||||
|
@ -291,7 +296,7 @@ class LogFileMonitor(unittest.TestCase):
|
|||
#
|
||||
# if we rewrite the file at once
|
||||
self.file.close()
|
||||
_copy_lines_between_files(GetFailures.FILENAME_01, self.name)
|
||||
_copy_lines_between_files(GetFailures.FILENAME_01, self.name).close()
|
||||
self.filter.getFailures(self.name)
|
||||
_assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
|
||||
|
||||
|
@ -308,6 +313,7 @@ class LogFileMonitor(unittest.TestCase):
|
|||
def testNewChangeViaGetFailures_move(self):
|
||||
#
|
||||
# if we move file into a new location while it has been open already
|
||||
self.file.close()
|
||||
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
|
||||
n=14, mode='w')
|
||||
self.filter.getFailures(self.name)
|
||||
|
@ -316,7 +322,7 @@ class LogFileMonitor(unittest.TestCase):
|
|||
|
||||
# move aside, but leaving the handle still open...
|
||||
os.rename(self.name, self.name + '.bak')
|
||||
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14)
|
||||
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14).close()
|
||||
self.filter.getFailures(self.name)
|
||||
_assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
|
||||
self.assertEqual(self.filter.failManager.getFailTotal(), 3)
|
||||
|
@ -454,7 +460,7 @@ def get_monitor_failures_testcase(Filter_):
|
|||
def test_rewrite_file(self):
|
||||
# if we rewrite the file at once
|
||||
self.file.close()
|
||||
_copy_lines_between_files(GetFailures.FILENAME_01, self.name)
|
||||
_copy_lines_between_files(GetFailures.FILENAME_01, self.name).close()
|
||||
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
|
||||
|
||||
# What if file gets overridden
|
||||
|
@ -468,6 +474,7 @@ def get_monitor_failures_testcase(Filter_):
|
|||
|
||||
def test_move_file(self):
|
||||
# if we move file into a new location while it has been open already
|
||||
self.file.close()
|
||||
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
|
||||
n=14, mode='w')
|
||||
# Poll might need more time
|
||||
|
@ -477,25 +484,25 @@ def get_monitor_failures_testcase(Filter_):
|
|||
|
||||
# move aside, but leaving the handle still open...
|
||||
os.rename(self.name, self.name + '.bak')
|
||||
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14)
|
||||
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14).close()
|
||||
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
|
||||
self.assertEqual(self.filter.failManager.getFailTotal(), 3)
|
||||
|
||||
# now remove the moved file
|
||||
_killfile(None, self.name + '.bak')
|
||||
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
|
||||
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100).close()
|
||||
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
|
||||
self.assertEqual(self.filter.failManager.getFailTotal(), 6)
|
||||
|
||||
|
||||
def test_new_bogus_file(self):
|
||||
# to make sure that watching whole directory does not effect
|
||||
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
|
||||
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100).close()
|
||||
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
|
||||
|
||||
# create a bogus file in the same directory and see if that doesn't affect
|
||||
open(self.name + '.bak2', 'w').write('')
|
||||
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
|
||||
open(self.name + '.bak2', 'w').close()
|
||||
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100).close()
|
||||
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
|
||||
self.assertEqual(self.filter.failManager.getFailTotal(), 6)
|
||||
_killfile(None, self.name + '.bak2')
|
||||
|
|
Loading…
Reference in New Issue