diff --git a/.travis.yml b/.travis.yml index 8aa781be..3f07cc51 100644 --- a/.travis.yml +++ b/.travis.yml @@ -38,8 +38,8 @@ before_script: script: # Keep the legacy setup.py test approach of checking coverage for python2 - if [[ "$F2B_PY_2" ]]; then coverage run setup.py test; fi - # Coverage doesn't pick up setup.py test with python3, so run it directly - - if [[ "$F2B_PY_3" ]]; then coverage run bin/fail2ban-testcases; fi + # Coverage doesn't pick up setup.py test with python3, so run it directly (with same verbosity as from setup) + - if [[ "$F2B_PY_3" ]]; then coverage run bin/fail2ban-testcases --verbosity=2; fi # Use $VENV_BIN (not python) or else sudo will always run the system's python (2.7) - sudo $VENV_BIN/pip install . # Doc files should get installed on Travis under Linux diff --git a/bin/fail2ban-testcases b/bin/fail2ban-testcases index 4a93ac95..cfd95055 100755 --- a/bin/fail2ban-testcases +++ b/bin/fail2ban-testcases @@ -61,6 +61,10 @@ def get_opt_parser(): choices=('heavydebug', 'debug', 'info', 'notice', 'warning', 'error', 'critical'), default=None, help="Log level for the logger to use during running tests"), + Option('-v', "--verbosity", action="store", + dest="verbosity", type=int, + default=None, + help="Set numerical level of verbosity (0..4)"), Option("--log-direct", action="store_false", dest="log_lazy", default=True, diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index 7907965b..15058d76 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -83,7 +83,7 @@ def _killfile(f, name): def _maxWaitTime(wtime): - if unittest.F2B.fast: + if unittest.F2B.fast: # pragma: no cover wtime /= 10 return wtime @@ -695,19 +695,46 @@ class LogFileMonitor(LogCaptureTestCase): self.assertEqual(self.filter.failManager.getFailTotal(), 3) +class CommonMonitorTestCase(unittest.TestCase): + + def setUp(self): + """Call before every test case.""" + self._failTotal = 0 + + def waitFailTotal(self, count, delay=1.): + """Wait up to `delay` sec to assure that expected failure `count` reached + """ + ret = Utils.wait_for( + lambda: self.filter.failManager.getFailTotal() >= self._failTotal + count and self.jail.isFilled(), + _maxWaitTime(delay)) + self._failTotal += count + return ret + + def isFilled(self, delay=1.): + """Wait up to `delay` sec to assure that it was modified or not + """ + return Utils.wait_for(self.jail.isFilled, _maxWaitTime(delay)) + + def isEmpty(self, delay=5): + """Wait up to `delay` sec to assure that it empty again + """ + return Utils.wait_for(self.jail.isEmpty, _maxWaitTime(delay)) + + def get_monitor_failures_testcase(Filter_): """Generator of TestCase's for different filters/backends """ # add Filter_'s name so we could easily identify bad cows testclass_name = tempfile.mktemp( - 'fail2ban', 'monitorfailures_%s' % (Filter_.__name__,)) + 'fail2ban', 'monitorfailures_%s_' % (Filter_.__name__,)) - class MonitorFailures(unittest.TestCase): + class MonitorFailures(CommonMonitorTestCase): count = 0 def setUp(self): """Call before every test case.""" + super(MonitorFailures, self).setUp() setUpMyTime() self.filter = self.name = 'NA' self.name = '%s-%d' % (testclass_name, self.count) @@ -737,11 +764,6 @@ def get_monitor_failures_testcase(Filter_): #time.sleep(0.2) # Give FS time to ack the removal pass - def isFilled(self, delay=1.): - """Wait up to `delay` sec to assure that it was modified or not - """ - return Utils.wait_for(self.jail.isFilled, _maxWaitTime(delay)) - def _sleep_4_poll(self): # Since FilterPoll relies on time stamps and some # actions might be happening too fast in the tests, @@ -749,12 +771,8 @@ def get_monitor_failures_testcase(Filter_): if isinstance(self.filter, FilterPoll): Utils.wait_for(self.filter.isAlive, _maxWaitTime(5)) - def isEmpty(self, delay=_maxWaitTime(5)): - # shorter wait time for not modified status - return Utils.wait_for(self.jail.isEmpty, _maxWaitTime(delay)) - def assert_correct_last_attempt(self, failures, count=None): - self.assertTrue(self.isFilled(10)) # give Filter a chance to react + self.assertTrue(self.waitFailTotal(count if count else failures[1], 10)) _assert_correct_last_attempt(self, self.jail, failures, count=count) def test_grow_file(self): @@ -770,7 +788,7 @@ def get_monitor_failures_testcase(Filter_): _copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=5) self.assertTrue(self.isFilled(10)) - # so we sleep for up to 2 sec for it not to become empty, + # so we sleep a bit for it not to become empty, # and meanwhile pass to other thread(s) and filter should # have gathered new failures and passed them into the # DummyJail @@ -905,17 +923,20 @@ def get_monitor_failures_testcase(Filter_): def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover """Generator of TestCase's for journal based filters/backends """ + + testclass_name = "monitorjournalfailures_%s" % (Filter_.__name__,) - class MonitorJournalFailures(unittest.TestCase): + class MonitorJournalFailures(CommonMonitorTestCase): def setUp(self): """Call before every test case.""" + super(MonitorJournalFailures, self).setUp() self.test_file = os.path.join(TEST_FILES_DIR, "testcase-journal.log") self.jail = DummyJail() self.filter = Filter_(self.jail) # UUID used to ensure that only meeages generated # as part of this test are picked up by the filter self.test_uuid = str(uuid.uuid4()) - self.name = "monitorjournalfailures-%s" % self.test_uuid + self.name = "%s-%s" % (testclass_name, self.test_uuid) self.filter.addJournalMatch([ "SYSLOG_IDENTIFIER=fail2ban-testcases", "TEST_FIELD=1", @@ -935,22 +956,10 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover self.filter.join() # wait for the thread to terminate pass - def __str__(self): - return "MonitorJournalFailures%s(%s)" \ - % (Filter_, hasattr(self, 'name') and self.name or 'tempfile') - - def isFilled(self, delay=1.): - """Wait up to `delay` sec to assure that it was modified or not - """ - return Utils.wait_for(self.jail.isFilled, _maxWaitTime(delay)) - - def isEmpty(self, delay=_maxWaitTime(5)): - # shorter wait time for not modified status - return Utils.wait_for(self.jail.isEmpty, _maxWaitTime(delay)) - def assert_correct_ban(self, test_ip, test_attempts): - self.assertTrue(self.isFilled(_maxWaitTime(10))) # give Filter a chance to react + self.assertTrue(self.waitFailTotal(test_attempts, 10)) # give Filter a chance to react ticket = self.jail.getFailTicket() + self.assertTrue(ticket) attempts = ticket.getAttempt() ip = ticket.getIP() @@ -1018,6 +1027,8 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover # we should detect the failures self.assertTrue(self.isFilled(10)) + MonitorJournalFailures.__name__ = "MonitorJournalFailures<%s>(%s)" \ + % (Filter_.__name__, testclass_name) return MonitorJournalFailures diff --git a/fail2ban/tests/utils.py b/fail2ban/tests/utils.py index 232da780..764536bf 100644 --- a/fail2ban/tests/utils.py +++ b/fail2ban/tests/utils.py @@ -64,7 +64,7 @@ os.putenv('PYTHONPATH', os.path.dirname(os.path.dirname(os.path.dirname( class DefaultTestOptions(optparse.Values): def __init__(self): self.__dict__ = { - 'log_level': None, 'log_lazy': True, + 'log_level': None, 'verbosity': None, 'log_lazy': True, 'log_traceback': None, 'full_traceback': None, 'fast': False, 'memory_db': False, 'no_gamin': False, 'no_network': False, 'negate_re': False @@ -78,15 +78,17 @@ def initProcess(opts): logSys = getLogger("fail2ban") # Numerical level of verbosity corresponding to a log "level" - verbosity = {'heavydebug': 4, - 'debug': 3, - 'info': 2, - 'notice': 2, - 'warning': 1, - 'error': 1, - 'critical': 0, - None: 1}[opts.log_level] - opts.verbosity = verbosity + verbosity = opts.verbosity + if verbosity is None: + verbosity = {'heavydebug': 4, + 'debug': 3, + 'info': 2, + 'notice': 2, + 'warning': 1, + 'error': 1, + 'critical': 0, + None: 1}[opts.log_level] + opts.verbosity = verbosity if opts.log_level is not None: # pragma: no cover # so we had explicit settings