Wrap open method to allow python3 to ignore unicode errors

pull/128/merge^2
Steven Hiscocks 12 years ago
parent 2bb3469644
commit 53be2ade86

@ -27,6 +27,7 @@ import os
import sys import sys
import time import time
import tempfile import tempfile
import functools
from server.jail import Jail from server.jail import Jail
from server.filterpoll import FilterPoll from server.filterpoll import FilterPoll
@ -38,6 +39,11 @@ from server.failmanager import FailManagerEmpty
# Useful helpers # Useful helpers
# #
if sys.version_info >= (3,):
open_ = functools.partial(open, errors='ignore')
else:
open_ = open
def _killfile(f, name): def _killfile(f, name):
try: try:
f.close() f.close()
@ -104,9 +110,9 @@ def _copy_lines_between_files(fin, fout, n=None, skip=0, mode='a', terminal_line
# polling filter could detect the change # polling filter could detect the change
time.sleep(1) time.sleep(1)
if isinstance(fin, str): if isinstance(fin, str):
fin = open(fin, 'r') fin = open_(fin, 'r')
if isinstance(fout, str): if isinstance(fout, str):
fout = open(fout, mode) fout = open_(fout, mode)
# Skip # Skip
for i in xrange(skip): for i in xrange(skip):
_ = fin.readline() _ = fin.readline()
@ -184,7 +190,7 @@ class LogFileMonitor(unittest.TestCase):
"""Call before every test case.""" """Call before every test case."""
self.filter = self.name = 'NA' self.filter = self.name = 'NA'
_, self.name = tempfile.mkstemp('fail2ban', 'monitorfailures') _, self.name = tempfile.mkstemp('fail2ban', 'monitorfailures')
self.file = open(self.name, 'a') self.file = open_(self.name, 'a')
self.filter = FilterPoll(None) self.filter = FilterPoll(None)
self.filter.addLogPath(self.name) self.filter.addLogPath(self.name)
self.filter.setActive(True) self.filter.setActive(True)
@ -226,7 +232,7 @@ class LogFileMonitor(unittest.TestCase):
# we are not signaling as modified whenever # we are not signaling as modified whenever
# it gets away # it gets away
self.assertTrue(self.notModified()) self.assertTrue(self.notModified())
f = open(self.name, 'a') f = open_(self.name, 'a')
self.assertTrue(self.isModified()) self.assertTrue(self.isModified())
self.assertTrue(self.notModified()) self.assertTrue(self.notModified())
_sleep_4_poll() _sleep_4_poll()
@ -329,7 +335,7 @@ def get_monitor_failures_testcase(Filter_):
"""Call before every test case.""" """Call before every test case."""
self.filter = self.name = 'NA' self.filter = self.name = 'NA'
_, self.name = tempfile.mkstemp('fail2ban', 'monitorfailures') _, self.name = tempfile.mkstemp('fail2ban', 'monitorfailures')
self.file = open(self.name, 'a') self.file = open_(self.name, 'a')
self.jail = DummyJail() self.jail = DummyJail()
self.filter = Filter_(self.jail) self.filter = Filter_(self.jail)
self.filter.addLogPath(self.name) self.filter.addLogPath(self.name)
@ -454,7 +460,7 @@ def get_monitor_failures_testcase(Filter_):
self.assert_correct_last_attempt(GetFailures.FAILURES_01) self.assert_correct_last_attempt(GetFailures.FAILURES_01)
# create a bogus file in the same directory and see if that doesn't affect # create a bogus file in the same directory and see if that doesn't affect
open(self.name + '.bak2', 'w').write('') open_(self.name + '.bak2', 'w').write('')
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100) _copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
self.assert_correct_last_attempt(GetFailures.FAILURES_01) self.assert_correct_last_attempt(GetFailures.FAILURES_01)
self.assertEqual(self.filter.failManager.getFailTotal(), 6) self.assertEqual(self.filter.failManager.getFailTotal(), 6)

Loading…
Cancel
Save