|
|
|
@ -21,13 +21,16 @@ import os
|
|
|
|
|
import unittest
|
|
|
|
|
import sys
|
|
|
|
|
|
|
|
|
|
from ..actiontestcase import CallingMap
|
|
|
|
|
from ..dummyjail import DummyJail
|
|
|
|
|
from ..utils import CONFIG_DIR
|
|
|
|
|
from ..servertestcase import IPAddr
|
|
|
|
|
from ..utils import LogCaptureTestCase, CONFIG_DIR
|
|
|
|
|
|
|
|
|
|
if sys.version_info >= (2,7): # pragma: no cover - may be unavailable
|
|
|
|
|
class BadIPsActionTest(unittest.TestCase):
|
|
|
|
|
class BadIPsActionTest(LogCaptureTestCase):
|
|
|
|
|
|
|
|
|
|
available = True, None
|
|
|
|
|
pythonModule = None
|
|
|
|
|
modAction = None
|
|
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
@ -39,17 +42,20 @@ if sys.version_info >= (2,7): # pragma: no cover - may be unavailable
|
|
|
|
|
|
|
|
|
|
self.jail.actions.add("test")
|
|
|
|
|
|
|
|
|
|
pythonModule = os.path.join(CONFIG_DIR, "action.d", "badips.py")
|
|
|
|
|
pythonModuleName = os.path.join(CONFIG_DIR, "action.d", "badips.py")
|
|
|
|
|
|
|
|
|
|
# check availability (once if not alive, used shorter timeout as in test cases):
|
|
|
|
|
if BadIPsActionTest.available[0]:
|
|
|
|
|
if not BadIPsActionTest.modAction:
|
|
|
|
|
BadIPsActionTest.modAction = self.jail.actions._load_python_module(pythonModule).Action
|
|
|
|
|
if not BadIPsActionTest.pythonModule:
|
|
|
|
|
BadIPsActionTest.pythonModule = self.jail.actions._load_python_module(pythonModuleName)
|
|
|
|
|
BadIPsActionTest.modAction = BadIPsActionTest.pythonModule.Action
|
|
|
|
|
self.jail.actions._load_python_module(pythonModuleName)
|
|
|
|
|
BadIPsActionTest.available = BadIPsActionTest.modAction.isAvailable(timeout=2 if unittest.F2B.fast else 10)
|
|
|
|
|
if not BadIPsActionTest.available[0]:
|
|
|
|
|
raise unittest.SkipTest('Skip test because service is not available: %s' % BadIPsActionTest.available[1])
|
|
|
|
|
|
|
|
|
|
self.jail.actions.add("badips", pythonModule, initOpts={
|
|
|
|
|
self.jail.actions.add("badips", pythonModuleName, initOpts={
|
|
|
|
|
'category': "ssh",
|
|
|
|
|
'banaction': "test",
|
|
|
|
|
'score': 5,
|
|
|
|
@ -106,3 +112,16 @@ if sys.version_info >= (2,7): # pragma: no cover - may be unavailable
|
|
|
|
|
"%s is fewer as 10: %r" % (len(self.action._bannedips), self.action._bannedips))
|
|
|
|
|
self.action.stop()
|
|
|
|
|
self.assertTrue(len(self.action._bannedips) == 0)
|
|
|
|
|
|
|
|
|
|
def testBanIP(self):
|
|
|
|
|
aInfo = CallingMap({
|
|
|
|
|
'ip': IPAddr('192.0.2.1')
|
|
|
|
|
})
|
|
|
|
|
self.action.ban(aInfo)
|
|
|
|
|
self.assertLogged('badips.com: ban', wait=True)
|
|
|
|
|
self.pruneLog()
|
|
|
|
|
# produce an error using wrong category/IP:
|
|
|
|
|
self.action._category = 'f2b-this-category-dont-available-test-suite-only'
|
|
|
|
|
aInfo['ip'] = ''
|
|
|
|
|
self.assertRaises(BadIPsActionTest.pythonModule.HTTPError, self.action.ban, aInfo)
|
|
|
|
|
self.assertLogged('IP is invalid', 'invalid category', wait=True, all=False)
|
|
|
|
|