Merge pull request #370 from grooverdan/test

MRG: more unit tests
pull/373/head
Daniel Black 2013-09-29 00:41:33 -07:00
commit eaba732d5b
8 changed files with 217 additions and 34 deletions

View File

@ -29,7 +29,7 @@ import unittest, logging, sys, time, os
if sys.version_info >= (2, 6): if sys.version_info >= (2, 6):
import json import json
else: else: # pragma: no cover
try: try:
import simplejson as json import simplejson as json
except ImportError: except ImportError:
@ -49,6 +49,7 @@ if json:
from testcases import samplestestcase from testcases import samplestestcase
from testcases.utils import FormatterWithTraceBack from testcases.utils import FormatterWithTraceBack
from testcases import actionstestcase
from server.mytime import MyTime from server.mytime import MyTime
from optparse import OptionParser, Option from optparse import OptionParser, Option
@ -108,7 +109,7 @@ stdout = logging.StreamHandler(sys.stdout)
fmt = ' %(message)s' fmt = ' %(message)s'
if opts.log_traceback: if opts.log_traceback or opts.full_traceback:
Formatter = FormatterWithTraceBack Formatter = FormatterWithTraceBack
fmt = (opts.full_traceback and ' %(tb)s' or ' %(tbc)s') + fmt fmt = (opts.full_traceback and ' %(tb)s' or ' %(tbc)s') + fmt
else: else:
@ -153,6 +154,7 @@ else: # pragma: no cover
tests.addTest(unittest.makeSuite(servertestcase.Transmitter)) tests.addTest(unittest.makeSuite(servertestcase.Transmitter))
tests.addTest(unittest.makeSuite(servertestcase.JailTests)) tests.addTest(unittest.makeSuite(servertestcase.JailTests))
tests.addTest(unittest.makeSuite(actiontestcase.ExecuteAction)) tests.addTest(unittest.makeSuite(actiontestcase.ExecuteAction))
tests.addTest(unittest.makeSuite(actionstestcase.ExecuteActions))
# FailManager # FailManager
tests.addTest(unittest.makeSuite(failmanagertestcase.AddFailure)) tests.addTest(unittest.makeSuite(failmanagertestcase.AddFailure))
# BanManager # BanManager
@ -184,7 +186,7 @@ tests.addTest(unittest.makeSuite(datedetectortestcase.DateDetectorTest))
if json: if json:
# Filter Regex tests with sample logs # Filter Regex tests with sample logs
tests.addTest(unittest.makeSuite(samplestestcase.FilterSamplesRegex)) tests.addTest(unittest.makeSuite(samplestestcase.FilterSamplesRegex))
else: else: # pragma: no cover
print "I: Skipping filter samples testing. No simplejson/json module" print "I: Skipping filter samples testing. No simplejson/json module"
# #

View File

@ -359,6 +359,10 @@ class Action:
#@staticmethod #@staticmethod
def executeCmd(realCmd): def executeCmd(realCmd):
logSys.debug(realCmd) logSys.debug(realCmd)
if not realCmd:
logSys.debug("Nothing to do")
return True
_cmd_lock.acquire() _cmd_lock.acquire()
try: # Try wrapped within another try needed for python version < 2.5 try: # Try wrapped within another try needed for python version < 2.5
try: try:

View File

@ -0,0 +1,79 @@
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
# vi: set ft=python sts=4 ts=4 sw=4 noet :
# This file is part of Fail2Ban.
#
# Fail2Ban is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# Fail2Ban is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Fail2Ban; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Author: Daniel Black
#
__author__ = "Daniel Black"
__copyright__ = "Copyright (c) 2013 Daniel Black"
__license__ = "GPL"
import unittest, time
import sys, os, tempfile
from server.actions import Actions
from dummyjail import DummyJail
class ExecuteActions(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
self.__jail = DummyJail()
self.__actions = Actions(self.__jail)
self.__tmpfile, self.__tmpfilename = tempfile.mkstemp()
def tearDown(self):
os.remove(self.__tmpfilename)
def defaultActions(self):
self.__actions.addAction('ip')
self.__ip = self.__actions.getAction('ip')
self.__ip.setActionStart('echo ip start 64 >> "%s"' % self.__tmpfilename )
self.__ip.setActionBan('echo ip ban <ip> >> "%s"' % self.__tmpfilename )
self.__ip.setActionUnban('echo ip unban <ip> >> "%s"' % self.__tmpfilename )
self.__ip.setActionCheck('echo ip check <ip> >> "%s"' % self.__tmpfilename )
self.__ip.setActionStop('echo ip stop >> "%s"' % self.__tmpfilename )
def testActionsManipulation(self):
self.__actions.addAction('test')
self.assertTrue(self.__actions.getAction('test'))
self.assertTrue(self.__actions.getLastAction())
self.assertRaises(KeyError,self.__actions.getAction,*['nonexistant action'])
self.__actions.addAction('test1')
self.__actions.delAction('test')
self.__actions.delAction('test1')
self.assertRaises(KeyError, self.__actions.getAction, *['test'])
self.assertRaises(IndexError,self.__actions.getLastAction)
self.__actions.setBanTime(127)
self.assertEqual(self.__actions.getBanTime(),127)
self.assertRaises(ValueError, self.__actions.removeBannedIP, '127.0.0.1')
def testActionsOutput(self):
self.defaultActions()
self.__actions.start()
f = open(self.__tmpfilename)
time.sleep(3)
self.assertEqual(f.read(),"ip start 64\n")
self.__actions.stop()
self.__actions.join()
self.assertEqual(self.__actions.status(),[("Currently banned", 0 ),
("Total banned", 0 ), ("IP list", [] )])

View File

@ -58,6 +58,11 @@ class ExecuteAction(unittest.TestCase):
def _is_logged(self, s): def _is_logged(self, s):
return s in self._log.getvalue() return s in self._log.getvalue()
def testNameChange(self):
self.assertEqual(self.__action.getName(), "Test")
self.__action.setName("Tricky Test")
self.assertEqual(self.__action.getName(), "Tricky Test")
def testSubstituteRecursiveTags(self): def testSubstituteRecursiveTags(self):
aInfo = { aInfo = {
'HOST': "192.0.2.0", 'HOST': "192.0.2.0",
@ -101,9 +106,15 @@ class ExecuteAction(unittest.TestCase):
def testExecuteActionBan(self): def testExecuteActionBan(self):
self.__action.setActionStart("touch /tmp/fail2ban.test") self.__action.setActionStart("touch /tmp/fail2ban.test")
self.assertEqual(self.__action.getActionStart(), "touch /tmp/fail2ban.test")
self.__action.setActionStop("rm -f /tmp/fail2ban.test") self.__action.setActionStop("rm -f /tmp/fail2ban.test")
self.assertEqual(self.__action.getActionStop(), 'rm -f /tmp/fail2ban.test')
self.__action.setActionBan("echo -n") self.__action.setActionBan("echo -n")
self.assertEqual(self.__action.getActionBan(), 'echo -n')
self.__action.setActionCheck("[ -e /tmp/fail2ban.test ]") self.__action.setActionCheck("[ -e /tmp/fail2ban.test ]")
self.assertEqual(self.__action.getActionCheck(), '[ -e /tmp/fail2ban.test ]')
self.__action.setActionUnban("true")
self.assertEqual(self.__action.getActionUnban(), 'true')
self.assertFalse(self._is_logged('returned')) self.assertFalse(self._is_logged('returned'))
# no action was actually executed yet # no action was actually executed yet
@ -112,6 +123,45 @@ class ExecuteAction(unittest.TestCase):
self.assertTrue(self._is_logged('Invariant check failed')) self.assertTrue(self._is_logged('Invariant check failed'))
self.assertTrue(self._is_logged('returned successfully')) self.assertTrue(self._is_logged('returned successfully'))
def testExecuteActionEmptyUnban(self):
self.__action.setActionUnban("")
self.assertTrue(self.__action.execActionUnban(None))
self.assertTrue(self._is_logged('Nothing to do'))
def testExecuteActionStartCtags(self):
self.__action.setCInfo("HOST","192.0.2.0")
self.__action.setActionStart("touch /tmp/fail2ban.test.<HOST>")
self.__action.setActionStop("rm -f /tmp/fail2ban.test.<HOST>")
self.__action.setActionCheck("[ -e /tmp/fail2ban.test.192.0.2.0 ]")
self.assertTrue(self.__action.execActionStart())
def testExecuteActionCheckRestoreEnvironment(self):
self.__action.setActionStart("")
self.__action.setActionStop("rm -f /tmp/fail2ban.test")
self.__action.setActionBan("rm /tmp/fail2ban.test")
self.__action.setActionCheck("[ -e /tmp/fail2ban.test ]")
self.assertFalse(self.__action.execActionBan(None))
self.assertTrue(self._is_logged('Unable to restore environment'))
def testExecuteActionChangeCtags(self):
self.__action.setCInfo("ROST","192.0.2.0")
self.assertEqual(self.__action.getCInfo("ROST"),"192.0.2.0")
self.__action.delCInfo("ROST")
self.assertRaises(KeyError, self.__action.getCInfo, "ROST")
def testExecuteActionUnbanAinfo(self):
aInfo = {
'ABC': "123",
}
self.__action.setActionBan("touch /tmp/fail2ban.test.123")
self.__action.setActionUnban("rm /tmp/fail2ban.test.<ABC>")
self.assertTrue(self.__action.execActionBan(None))
self.assertTrue(self.__action.execActionUnban(aInfo))
def testExecuteActionStartEmpty(self):
self.__action.setActionStart("")
self.assertTrue(self.__action.execActionStart())
self.assertTrue(self._is_logged('Nothing to do'))
def testExecuteIncorrectCmd(self): def testExecuteIncorrectCmd(self):
Action.executeCmd('/bin/ls >/dev/null\nbogusXXX now 2>/dev/null') Action.executeCmd('/bin/ls >/dev/null\nbogusXXX now 2>/dev/null')

View File

@ -164,6 +164,14 @@ class DateDetectorTest(unittest.TestCase):
print "WARNING: The following date templates overlap:" print "WARNING: The following date templates overlap:"
pprint.pprint(overlapedTemplates) pprint.pprint(overlapedTemplates)
def testDateTemplate(self):
t = DateTemplate()
t.setRegex('^a{3,5}b?c*$')
self.assertEqual(t.getRegex(), '^a{3,5}b?c*$')
self.assertRaises(Exception, t.getDate, '')
self.assertEqual(t.matchDate('aaaac').group(), 'aaaac')
# def testDefaultTempate(self): # def testDefaultTempate(self):
# self.__datedetector.setDefaultRegex("^\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}") # self.__datedetector.setDefaultRegex("^\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}")
# self.__datedetector.setDefaultPattern("%b %d %H:%M:%S") # self.__datedetector.setDefaultPattern("%b %d %H:%M:%S")

59
testcases/dummyjail.py Normal file
View File

@ -0,0 +1,59 @@
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
# vi: set ft=python sts=4 ts=4 sw=4 noet :
# This file is part of Fail2Ban.
#
# Fail2Ban is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# Fail2Ban is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Fail2Ban; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Fail2Ban developers
__copyright__ = "Copyright (c) 2012 Yaroslav Halchenko"
__license__ = "GPL"
from threading import Lock
class DummyJail(object):
"""A simple 'jail' to suck in all the tickets generated by Filter's
"""
def __init__(self):
self.lock = Lock()
self.queue = []
def __len__(self):
try:
self.lock.acquire()
return len(self.queue)
finally:
self.lock.release()
def putFailTicket(self, ticket):
try:
self.lock.acquire()
self.queue.append(ticket)
finally:
self.lock.release()
def getFailTicket(self):
try:
self.lock.acquire()
try:
return self.queue.pop()
except IndexError:
return False
finally:
self.lock.release()
def getName(self):
return "DummyJail #%s with %d tickets" % (id(self), len(self))

View File

@ -53,8 +53,18 @@ class AddFailure(unittest.TestCase):
def tearDown(self): def tearDown(self):
"""Call after every test case.""" """Call after every test case."""
def testAdd(self): def testFailManagerAdd(self):
self.assertEqual(self.__failManager.size(), 3) self.assertEqual(self.__failManager.size(), 3)
self.assertEqual(self.__failManager.getFailTotal(), 13)
self.__failManager.setFailTotal(0)
self.assertEqual(self.__failManager.getFailTotal(), 0)
self.__failManager.setFailTotal(13)
def testFailManagerMaxTime(self):
self.assertEqual(self.__failManager.getMaxTime(), 600)
self.__failManager.setMaxTime(13)
self.assertEqual(self.__failManager.getMaxTime(), 13)
self.__failManager.setMaxTime(600)
def _testDel(self): def _testDel(self):
self.__failManager.delFailure('193.168.0.128') self.__failManager.delFailure('193.168.0.128')

View File

@ -306,36 +306,7 @@ class LogFileMonitor(unittest.TestCase):
from threading import Lock from threading import Lock
class DummyJail(object): from dummyjail import DummyJail
"""A simple 'jail' to suck in all the tickets generated by Filter's
"""
def __init__(self):
self.lock = Lock()
self.queue = []
def __len__(self):
try:
self.lock.acquire()
return len(self.queue)
finally:
self.lock.release()
def putFailTicket(self, ticket):
try:
self.lock.acquire()
self.queue.append(ticket)
finally:
self.lock.release()
def getFailTicket(self):
try:
self.lock.acquire()
return self.queue.pop()
finally:
self.lock.release()
def getName(self):
return "DummyJail #%s with %d tickets" % (id(self), len(self))
def get_monitor_failures_testcase(Filter_): def get_monitor_failures_testcase(Filter_):
"""Generator of TestCase's for different filters/backends """Generator of TestCase's for different filters/backends