Merge pull request #523 from grooverdan/more-0.9-tests

TST: more test of filters
pull/527/head
Steven Hiscocks 11 years ago
commit 087af27c65

@ -36,7 +36,7 @@ except ImportError:
from fail2ban.server.jail import Jail from fail2ban.server.jail import Jail
from fail2ban.server.filterpoll import FilterPoll from fail2ban.server.filterpoll import FilterPoll
from fail2ban.server.filter import FileFilter, DNSUtils from fail2ban.server.filter import Filter, FileFilter, DNSUtils
from fail2ban.server.failmanager import FailManager from fail2ban.server.failmanager import FailManager
from fail2ban.server.failmanager import FailManagerEmpty from fail2ban.server.failmanager import FailManagerEmpty
from fail2ban.server.mytime import MyTime from fail2ban.server.mytime import MyTime
@ -202,6 +202,12 @@ class BasicFilter(unittest.TestCase):
self.filter.setUseDns(False) self.filter.setUseDns(False)
self.assertEqual(self.filter.getUseDns(), 'no') self.assertEqual(self.filter.getUseDns(), 'no')
def testGetSetDatePattern(self):
self.assertEqual(self.filter.getDatePattern(), None)
self.filter.setDatePattern("^%Y-%m-%d-%H%M%S.%f %z")
self.assertEqual(self.filter.getDatePattern(),
("^%Y-%m-%d-%H%M%S.%f %z",
"Year-Month-Day-24hourMinuteSecond.Microseconds Zone offset"))
class IgnoreIP(LogCaptureTestCase): class IgnoreIP(LogCaptureTestCase):

@ -29,31 +29,45 @@ import unittest, socket, time, tempfile, os, locale, sys, logging
from fail2ban.server.server import Server from fail2ban.server.server import Server
from fail2ban.server.jail import Jail from fail2ban.server.jail import Jail
from fail2ban.exceptions import UnknownJailException from fail2ban.exceptions import UnknownJailException
from fail2ban.tests.utils import LogCaptureTestCase
#from bin.fail2ban-client import Fail2banClient
try: try:
from fail2ban.server import filtersystemd from fail2ban.server import filtersystemd
except ImportError: except ImportError: # pragma: no cover
filtersystemd = None filtersystemd = None
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files") TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
class StartStop(unittest.TestCase): class StartStop(LogCaptureTestCase):
def setUp(self): def setUp(self):
"""Call before every test case.""" self.client = Fail2banClient()
self.__server = Server() LogCaptureTestCase.setUp(self)
self.__server.setLogLevel(0) sock_fd, sock_name = tempfile.mkstemp('fail2ban.sock', 'transmitter')
self.__server.start(False) os.close(sock_fd)
os.remove(sock_name)
pidfile_fd, pidfile_name = tempfile.mkstemp(
'fail2ban.pid', 'transmitter')
os.close(pidfile_fd)
os.remove(pidfile_name)
self.client.__getCmdLineOptions([
('-c', os.path.join('fail2ban', 'tests', 'config')),
('-s', sock_name),
('-p', pidfile_name)])
self.client.__startServerAsync(sock_name, pidfile_name, False)
self.client.__waitOnServer()
def tearDown(self): def tearDown(self):
"""Call after every test case."""
self.__server.quit() self.__server.quit()
LogCaptureTestCase.tearDown(self)
def testStartStopJail(self): def testStartStopJail(self):
name = "TestCase" name = "TestCase"
self.__server.addJail(name) self.__server.addJail(name, "auto")
self.__server.startJail(name) self.__server.startJail(name)
time.sleep(1) time.sleep(1)
self.__server.stopJail(name) self.__server.stopJail(name)
self.printLog()
class TestServer(Server): class TestServer(Server):
def setLogLevel(self, *args, **kwargs): def setLogLevel(self, *args, **kwargs):
@ -129,9 +143,6 @@ class TransmitterBase(unittest.TestCase):
cmdAdd = "add" + cmd cmdAdd = "add" + cmd
cmdDel = "del" + cmd cmdDel = "del" + cmd
if outValues is None:
outValues = inValues
self.assertEqual( self.assertEqual(
self.transm.proceed(["get", jail, cmd]), (0, [])) self.transm.proceed(["get", jail, cmd]), (0, []))
for n, value in enumerate(inValues): for n, value in enumerate(inValues):
@ -550,7 +561,7 @@ class Transmitter(TransmitterBase):
self.transm.proceed(["status", "INVALID", "COMMAND"])[0],1) self.transm.proceed(["status", "INVALID", "COMMAND"])[0],1)
def testJournalMatch(self): def testJournalMatch(self):
if not filtersystemd: if not filtersystemd: # pragma: no cover
if sys.version_info >= (2, 7): if sys.version_info >= (2, 7):
raise unittest.SkipTest( raise unittest.SkipTest(
"systemd python interface not avilable") "systemd python interface not avilable")
@ -650,7 +661,6 @@ class TransmitterLogging(TransmitterBase):
self.setGetTest("logtarget", "STDOUT") self.setGetTest("logtarget", "STDOUT")
self.setGetTest("logtarget", "STDERR") self.setGetTest("logtarget", "STDERR")
if sys.platform.lower().startswith('linux'):
self.setGetTest("logtarget", "SYSLOG") self.setGetTest("logtarget", "SYSLOG")
def testLogLevel(self): def testLogLevel(self):

@ -191,11 +191,12 @@ def gatherTests(regexps=None, no_network=False):
tests.addTest(unittest.makeSuite(databasetestcase.DatabaseTest)) tests.addTest(unittest.makeSuite(databasetestcase.DatabaseTest))
# Filter # Filter
if not no_network:
tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIP)) tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIP))
tests.addTest(unittest.makeSuite(filtertestcase.BasicFilter))
tests.addTest(unittest.makeSuite(filtertestcase.LogFile)) tests.addTest(unittest.makeSuite(filtertestcase.LogFile))
tests.addTest(unittest.makeSuite(filtertestcase.LogFileMonitor)) tests.addTest(unittest.makeSuite(filtertestcase.LogFileMonitor))
if not no_network: if not no_network:
tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIPDNS))
tests.addTest(unittest.makeSuite(filtertestcase.GetFailures)) tests.addTest(unittest.makeSuite(filtertestcase.GetFailures))
tests.addTest(unittest.makeSuite(filtertestcase.DNSUtilsTests)) tests.addTest(unittest.makeSuite(filtertestcase.DNSUtilsTests))
tests.addTest(unittest.makeSuite(filtertestcase.JailTests)) tests.addTest(unittest.makeSuite(filtertestcase.JailTests))

Loading…
Cancel
Save