|
|
|
@ -22,6 +22,7 @@
|
|
|
|
|
__copyright__ = "Copyright (c) 2004 Cyril Jaquier; 2012 Yaroslav Halchenko"
|
|
|
|
|
__license__ = "GPL"
|
|
|
|
|
|
|
|
|
|
from __builtin__ import open as fopen
|
|
|
|
|
import unittest
|
|
|
|
|
import os
|
|
|
|
|
import sys
|
|
|
|
@ -44,13 +45,18 @@ TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
|
|
|
|
|
# https://github.com/fail2ban/fail2ban/issues/103#issuecomment-15542836
|
|
|
|
|
# adding a sufficiently large buffer might help to guarantee that
|
|
|
|
|
# writes happen atomically.
|
|
|
|
|
# Overload also for python3 to use utf-8 encoding by default
|
|
|
|
|
if sys.version_info >= (3,):
|
|
|
|
|
def open_(filename, mode):
|
|
|
|
|
return open(filename, mode, 50000, encoding='utf-8', errors='ignore')
|
|
|
|
|
else:
|
|
|
|
|
def open_(filename, mode):
|
|
|
|
|
return open(filename, mode, 50000)
|
|
|
|
|
def open(*args):
|
|
|
|
|
"""Overload built in open so we could assure sufficiently large buffer
|
|
|
|
|
|
|
|
|
|
Explicit .flush would be needed to assure that changes leave the buffer
|
|
|
|
|
"""
|
|
|
|
|
if len(args) == 2:
|
|
|
|
|
# ~50kB buffer should be sufficient for all tests here.
|
|
|
|
|
args = args + (50000,)
|
|
|
|
|
if sys.version_info >= (3,):
|
|
|
|
|
return fopen(*args, encoding='utf-8', errors='ignore')
|
|
|
|
|
else:
|
|
|
|
|
return fopen(*args)
|
|
|
|
|
|
|
|
|
|
def _killfile(f, name):
|
|
|
|
|
try:
|
|
|
|
@ -126,7 +132,7 @@ def _copy_lines_between_files(fin, fout, n=None, skip=0, mode='a', terminal_line
|
|
|
|
|
# polling filter could detect the change
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
if isinstance(fin, str): # pragma: no branch - only used with str in test cases
|
|
|
|
|
fin = open_(fin, 'r')
|
|
|
|
|
fin = open(fin, 'r')
|
|
|
|
|
# Skip
|
|
|
|
|
for i in xrange(skip):
|
|
|
|
|
_ = fin.readline()
|
|
|
|
@ -141,7 +147,7 @@ def _copy_lines_between_files(fin, fout, n=None, skip=0, mode='a', terminal_line
|
|
|
|
|
i += 1
|
|
|
|
|
# Write: all at once and flush
|
|
|
|
|
if isinstance(fout, str):
|
|
|
|
|
fout = open_(fout, mode)
|
|
|
|
|
fout = open(fout, mode)
|
|
|
|
|
fout.write('\n'.join(lines))
|
|
|
|
|
fout.flush()
|
|
|
|
|
# to give other threads possibly some time to crunch
|
|
|
|
@ -209,7 +215,7 @@ class LogFileMonitor(unittest.TestCase):
|
|
|
|
|
"""Call before every test case."""
|
|
|
|
|
self.filter = self.name = 'NA'
|
|
|
|
|
_, self.name = tempfile.mkstemp('fail2ban', 'monitorfailures')
|
|
|
|
|
self.file = open_(self.name, 'a')
|
|
|
|
|
self.file = open(self.name, 'a')
|
|
|
|
|
self.filter = FilterPoll(None)
|
|
|
|
|
self.filter.addLogPath(self.name)
|
|
|
|
|
self.filter.setActive(True)
|
|
|
|
@ -251,7 +257,7 @@ class LogFileMonitor(unittest.TestCase):
|
|
|
|
|
# we are not signaling as modified whenever
|
|
|
|
|
# it gets away
|
|
|
|
|
self.assertTrue(self.notModified())
|
|
|
|
|
f = open_(self.name, 'a')
|
|
|
|
|
f = open(self.name, 'a')
|
|
|
|
|
self.assertTrue(self.isModified())
|
|
|
|
|
self.assertTrue(self.notModified())
|
|
|
|
|
_sleep_4_poll()
|
|
|
|
@ -360,7 +366,7 @@ def get_monitor_failures_testcase(Filter_):
|
|
|
|
|
self.filter = self.name = 'NA'
|
|
|
|
|
self.name = '%s-%d' % (testclass_name, self.count)
|
|
|
|
|
MonitorFailures.count += 1 # so we have unique filenames across tests
|
|
|
|
|
self.file = open_(self.name, 'a')
|
|
|
|
|
self.file = open(self.name, 'a')
|
|
|
|
|
self.jail = DummyJail()
|
|
|
|
|
self.filter = Filter_(self.jail)
|
|
|
|
|
self.filter.addLogPath(self.name)
|
|
|
|
@ -483,7 +489,7 @@ def get_monitor_failures_testcase(Filter_):
|
|
|
|
|
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
|
|
|
|
|
|
|
|
|
|
# create a bogus file in the same directory and see if that doesn't affect
|
|
|
|
|
open_(self.name + '.bak2', 'w').write('')
|
|
|
|
|
open(self.name + '.bak2', 'w').write('')
|
|
|
|
|
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
|
|
|
|
|
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
|
|
|
|
|
self.assertEqual(self.filter.failManager.getFailTotal(), 6)
|
|
|
|
|