test cases extended

several test-case functionality cherry picked from 0.10 (SkipTest, with_tmpdir)
pull/1523/head
sebres 2016-09-01 15:47:17 +02:00
parent 35b5fea038
commit 5f35b52b9a
5 changed files with 82 additions and 28 deletions

View File

@ -36,7 +36,7 @@ from ..client.jailsreader import JailsReader
from ..client.actionreader import ActionReader
from ..client.configurator import Configurator
from ..version import version
from .utils import LogCaptureTestCase
from .utils import LogCaptureTestCase, with_tmpdir
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
@ -94,9 +94,8 @@ option = %s
if not os.access(f, os.R_OK):
self.assertFalse(self.c.read('d')) # should not be readable BUT present
else:
# SkipTest introduced only in 2.7 thus can't yet use generally
# raise unittest.SkipTest("Skipping on %s -- access rights are not enforced" % platform)
pass
import platform
raise unittest.SkipTest("Skipping on %s -- access rights are not enforced" % platform.platform())
def testOptionalDotDDir(self):
self.assertFalse(self.c.read('c')) # nothing is there yet
@ -281,8 +280,8 @@ class JailReaderTest(LogCaptureTestCase):
self.assertEqual(eval(act[2][5]).get('agent', '<wrong>'), useragent)
self.assertEqual(act[3], ['set', 'blocklisttest', 'action', 'mynetwatchman', 'agent', useragent])
def testGlob(self):
d = tempfile.mkdtemp(prefix="f2b-temp")
@with_tmpdir
def testGlob(self, d):
# Generate few files
# regular file
f1 = os.path.join(d, 'f1')
@ -297,9 +296,6 @@ class JailReaderTest(LogCaptureTestCase):
self.assertEqual(JailReader._glob(f2), [])
self.assertLogged('File %s is a dangling link, thus cannot be monitored' % f2)
self.assertEqual(JailReader._glob(os.path.join(d, 'nonexisting')), [])
os.remove(f1)
os.remove(f2)
os.rmdir(d)
class FilterReaderTest(unittest.TestCase):
@ -433,10 +429,10 @@ class JailsReaderTestCache(LogCaptureTestCase):
cnt += 1
return cnt
def testTestJailConfCache(self):
@with_tmpdir
def testTestJailConfCache(self, basedir):
saved_ll = configparserinc.logLevel
configparserinc.logLevel = logging.DEBUG
basedir = tempfile.mkdtemp("fail2ban_conf")
try:
shutil.rmtree(basedir)
shutil.copytree(CONFIG_DIR, basedir)
@ -468,7 +464,6 @@ class JailsReaderTestCache(LogCaptureTestCase):
cnt = self._getLoggedReadCount(r'action\.d/iptables-common\.conf')
self.assertTrue(cnt == 1, "Unexpected count by reading of action files, cnt = %s" % cnt)
finally:
shutil.rmtree(basedir)
configparserinc.logLevel = saved_ll
@ -718,8 +713,8 @@ class JailsReaderTest(LogCaptureTestCase):
self.assertEqual(configurator._Configurator__jails.getBaseDir(), '/tmp')
self.assertEqual(configurator.getBaseDir(), CONFIG_DIR)
def testMultipleSameAction(self):
basedir = tempfile.mkdtemp("fail2ban_conf")
@with_tmpdir
def testMultipleSameAction(self, basedir):
os.mkdir(os.path.join(basedir, "filter.d"))
os.mkdir(os.path.join(basedir, "action.d"))
open(os.path.join(basedir, "action.d", "testaction1.conf"), 'w').close()
@ -748,4 +743,33 @@ filter = testfilter1
# Python actions should not be passed `actname`
self.assertEqual(add_actions[-1][-1], "{}")
shutil.rmtree(basedir)
def testLogPathFileFilterBackend(self):
self.assertRaisesRegexp(ValueError, r"Have not found any log file for .* jail",
self._testLogPath, backend='polling')
def testLogPathSystemdBackend(self):
try: # pragma: systemd no cover
from ..server.filtersystemd import FilterSystemd
except Exception, e: # pragma: no cover
raise unittest.SkipTest("systemd python interface not available")
self._testLogPath(backend='systemd')
self._testLogPath(backend='systemd[journalflags=2]')
@with_tmpdir
def _testLogPath(self, basedir, backend):
jailfd = open(os.path.join(basedir, "jail.conf"), 'w')
jailfd.write("""
[testjail1]
enabled = true
backend = %s
logpath = %s/not/exist.log
/this/path/should/not/exist.log
action =
filter =
failregex = test <HOST>
""" % (backend, basedir))
jailfd.close()
jails = JailsReader(basedir=basedir)
self.assertTrue(jails.read())
self.assertTrue(jails.getOptions())
jails.convert()

View File

@ -48,12 +48,10 @@ class DatabaseTest(LogCaptureTestCase):
def setUp(self):
"""Call before every test case."""
super(DatabaseTest, self).setUp()
if Fail2BanDb is None and sys.version_info >= (2,7): # pragma: no cover
if Fail2BanDb is None: # pragma: no cover
raise unittest.SkipTest(
"Unable to import fail2ban database module as sqlite is not "
"available.")
elif Fail2BanDb is None:
return
_, self.dbFilename = tempfile.mkstemp(".db", "fail2ban_")
self.db = Fail2BanDb(self.dbFilename)

View File

@ -734,7 +734,7 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
pass
def testJournalFlagsArg(self):
self._initFilter(journalflags=0) # journal.RUNTIME_ONLY
self._initFilter(journalflags=2) # journal.RUNTIME_ONLY
def __str__(self):
return "MonitorJournalFailures%s(%s)" \

View File

@ -688,10 +688,7 @@ class Transmitter(TransmitterBase):
def testJournalMatch(self):
if not filtersystemd: # pragma: no cover
if sys.version_info >= (2, 7):
raise unittest.SkipTest(
"systemd python interface not available")
return
raise unittest.SkipTest("systemd python interface not available")
jailName = "TestJail2"
self.server.addJail(jailName, "systemd")
values = [
@ -791,10 +788,8 @@ class TransmitterLogging(TransmitterBase):
self.setGetTest("logtarget", "STDERR")
def testLogTargetSYSLOG(self):
if not os.path.exists("/dev/log") and sys.version_info >= (2, 7):
if not os.path.exists("/dev/log"):
raise unittest.SkipTest("'/dev/log' not present")
elif not os.path.exists("/dev/log"):
return
self.assertTrue(self.server.getSyslogSocket(), "auto")
self.setGetTest("logtarget", "SYSLOG")
self.assertTrue(self.server.getSyslogSocket(), "/dev/log")

View File

@ -26,10 +26,13 @@ import itertools
import logging
import os
import re
import tempfile
import shutil
import sys
import time
import unittest
from StringIO import StringIO
from functools import wraps
from ..server.mytime import MyTime
from ..helpers import getLogger
@ -46,6 +49,40 @@ if not CONFIG_DIR:
CONFIG_DIR = '/etc/fail2ban'
def with_tmpdir(f):
"""Helper decorator to create a temporary directory
Directory gets removed after function returns, regardless
if exception was thrown of not
"""
@wraps(f)
def wrapper(self, *args, **kwargs):
tmp = tempfile.mkdtemp(prefix="f2b-temp")
try:
return f(self, tmp, *args, **kwargs)
finally:
# clean up
shutil.rmtree(tmp)
return wrapper
# backwards compatibility to python 2.6:
if not hasattr(unittest, 'SkipTest'): # pragma: no cover
class SkipTest(Exception):
pass
unittest.SkipTest = SkipTest
_org_AddError = unittest._TextTestResult.addError
def addError(self, test, err):
if err[0] is SkipTest:
if self.showAll:
self.stream.writeln(str(err[1]))
elif self.dots:
self.stream.write('s')
self.stream.flush()
return
_org_AddError(self, test, err)
unittest._TextTestResult.addError = addError
def mtimesleep():
# no sleep now should be necessary since polling tracks now not only
# mtime but also ino and size
@ -218,8 +255,8 @@ if not hasattr(unittest.TestCase, 'assertRaisesRegexp'):
try:
fun(*args, **kwargs)
except exccls as e:
if re.search(regexp, e.message) is None:
self.fail('\"%s\" does not match \"%s\"' % (regexp, e.message))
if re.search(regexp, str(e)) is None:
self.fail('\"%s\" does not match \"%s\"' % (regexp, e))
else:
self.fail('%s not raised' % getattr(exccls, '__name__'))
unittest.TestCase.assertRaisesRegexp = assertRaisesRegexp