badips test cases check availability of badips service (and skip this tests if it not available)

pull/1562/head
sebres 8 years ago
parent 9a7c753372
commit 8b0f6c5413

@ -34,7 +34,7 @@ else:
from fail2ban.server.actions import ActionBase
class BadIPsAction(ActionBase):
class BadIPsAction(ActionBase): # pragma: no cover - may be unavailable
"""Fail2Ban action which reports bans to badips.com, and also
blacklist bad IPs listed on badips.com by using another action's
ban method.
@ -105,6 +105,16 @@ class BadIPsAction(ActionBase):
# Used later for threading.Timer for updating badips
self._timer = None
@staticmethod
def isAvailable(timeout=1):
try:
response = urlopen(Request("/".join([BadIPsAction._badips]),
headers={'User-Agent': "Fail2Ban"}), timeout=timeout)
return True, ''
except Exception as e: # pragma: no cover
return False, e
def getCategories(self, incParents=False):
"""Get badips.com categories.

@ -85,6 +85,26 @@ class Actions(JailThread, Mapping):
## The ban manager.
self.__banManager = BanManager()
@staticmethod
def _load_python_module(pythonModule):
pythonModuleName = os.path.splitext(
os.path.basename(pythonModule))[0]
if sys.version_info >= (3, 3):
mod = importlib.machinery.SourceFileLoader(
pythonModuleName, pythonModule).load_module()
else:
mod = imp.load_source(
pythonModuleName, pythonModule)
if not hasattr(mod, "Action"):
raise RuntimeError(
"%s module does not have 'Action' class" % pythonModule)
elif not issubclass(mod.Action, ActionBase):
raise RuntimeError(
"%s module %s does not implement required methods" % (
pythonModule, mod.Action.__name__))
return mod
def add(self, name, pythonModule=None, initOpts=None, reload=False):
"""Adds a new action.
@ -127,21 +147,7 @@ class Actions(JailThread, Mapping):
if pythonModule is None:
action = CommandAction(self._jail, name)
else:
pythonModuleName = os.path.splitext(
os.path.basename(pythonModule))[0]
if sys.version_info >= (3, 3):
customActionModule = importlib.machinery.SourceFileLoader(
pythonModuleName, pythonModule).load_module()
else:
customActionModule = imp.load_source(
pythonModuleName, pythonModule)
if not hasattr(customActionModule, "Action"):
raise RuntimeError(
"%s module does not have 'Action' class" % pythonModule)
elif not issubclass(customActionModule.Action, ActionBase):
raise RuntimeError(
"%s module %s does not implement required methods" % (
pythonModule, customActionModule.Action.__name__))
customActionModule = self._load_python_module(pythonModule)
action = customActionModule.Action(self._jail, name, **initOpts)
self._actions[name] = action

@ -24,9 +24,12 @@ import sys
from ..dummyjail import DummyJail
from ..utils import CONFIG_DIR
if sys.version_info >= (2,7):
if sys.version_info >= (2,7): # pragma: no cover - may be unavailable
class BadIPsActionTest(unittest.TestCase):
available = True, None
modAction = None
def setUp(self):
"""Call before every test case."""
unittest.F2B.SkipIfNoNetwork()
@ -36,6 +39,15 @@ if sys.version_info >= (2,7):
self.jail.actions.add("test")
pythonModule = os.path.join(CONFIG_DIR, "action.d", "badips.py")
# check availability (once if not alive, used shorter timeout as in test cases):
if BadIPsActionTest.available[0]:
if not BadIPsActionTest.modAction:
BadIPsActionTest.modAction = self.jail.actions._load_python_module(pythonModule).Action
BadIPsActionTest.available = BadIPsActionTest.modAction.isAvailable(timeout=2 if unittest.F2B.fast else 10)
if not BadIPsActionTest.available[0]:
raise unittest.SkipTest('Skip test because service is not available: %s' % BadIPsActionTest.available[1])
self.jail.actions.add("badips", pythonModule, initOpts={
'category': "ssh",
'banaction': "test",

Loading…
Cancel
Save