|
|
@ -20,12 +20,34 @@
|
|
|
|
import os
|
|
|
|
import os
|
|
|
|
import unittest
|
|
|
|
import unittest
|
|
|
|
import sys
|
|
|
|
import sys
|
|
|
|
|
|
|
|
from functools import wraps
|
|
|
|
|
|
|
|
from socket import timeout
|
|
|
|
|
|
|
|
from ssl import SSLError
|
|
|
|
|
|
|
|
|
|
|
|
from ..actiontestcase import CallingMap
|
|
|
|
from ..actiontestcase import CallingMap
|
|
|
|
from ..dummyjail import DummyJail
|
|
|
|
from ..dummyjail import DummyJail
|
|
|
|
from ..servertestcase import IPAddr
|
|
|
|
from ..servertestcase import IPAddr
|
|
|
|
from ..utils import LogCaptureTestCase, CONFIG_DIR
|
|
|
|
from ..utils import LogCaptureTestCase, CONFIG_DIR
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if sys.version_info >= (3, ):
|
|
|
|
|
|
|
|
from urllib.error import HTTPError, URLError
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
from urllib2 import HTTPError, URLError
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def skip_if_not_available(f):
|
|
|
|
|
|
|
|
"""Helper to decorate tests to skip in case of timeout/http-errors like "502 bad gateway".
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
@wraps(f)
|
|
|
|
|
|
|
|
def wrapper(self, *args):
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
return f(self, *args)
|
|
|
|
|
|
|
|
except (SSLError, HTTPError, URLError, timeout) as e: # pragma: no cover - timeout/availability issues
|
|
|
|
|
|
|
|
if not isinstance(e, timeout) and 'timed out' not in str(e):
|
|
|
|
|
|
|
|
if not hasattr(e, 'code') or e.code > 200 and e.code <= 404:
|
|
|
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
raise unittest.SkipTest('Skip test because of %s' % e)
|
|
|
|
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
|
|
|
|
if sys.version_info >= (2,7): # pragma: no cover - may be unavailable
|
|
|
|
if sys.version_info >= (2,7): # pragma: no cover - may be unavailable
|
|
|
|
class BadIPsActionTest(LogCaptureTestCase):
|
|
|
|
class BadIPsActionTest(LogCaptureTestCase):
|
|
|
|
|
|
|
|
|
|
|
@ -51,7 +73,7 @@ if sys.version_info >= (2,7): # pragma: no cover - may be unavailable
|
|
|
|
BadIPsActionTest.pythonModule = self.jail.actions._load_python_module(pythonModuleName)
|
|
|
|
BadIPsActionTest.pythonModule = self.jail.actions._load_python_module(pythonModuleName)
|
|
|
|
BadIPsActionTest.modAction = BadIPsActionTest.pythonModule.Action
|
|
|
|
BadIPsActionTest.modAction = BadIPsActionTest.pythonModule.Action
|
|
|
|
self.jail.actions._load_python_module(pythonModuleName)
|
|
|
|
self.jail.actions._load_python_module(pythonModuleName)
|
|
|
|
BadIPsActionTest.available = BadIPsActionTest.modAction.isAvailable(timeout=2 if unittest.F2B.fast else 60)
|
|
|
|
BadIPsActionTest.available = BadIPsActionTest.modAction.isAvailable(timeout=2 if unittest.F2B.fast else 30)
|
|
|
|
if not BadIPsActionTest.available[0]:
|
|
|
|
if not BadIPsActionTest.available[0]:
|
|
|
|
raise unittest.SkipTest('Skip test because service is not available: %s' % BadIPsActionTest.available[1])
|
|
|
|
raise unittest.SkipTest('Skip test because service is not available: %s' % BadIPsActionTest.available[1])
|
|
|
|
|
|
|
|
|
|
|
@ -62,7 +84,7 @@ if sys.version_info >= (2,7): # pragma: no cover - may be unavailable
|
|
|
|
'score': 5,
|
|
|
|
'score': 5,
|
|
|
|
'key': "fail2ban-test-suite",
|
|
|
|
'key': "fail2ban-test-suite",
|
|
|
|
#'bankey': "fail2ban-test-suite",
|
|
|
|
#'bankey': "fail2ban-test-suite",
|
|
|
|
'timeout': (3 if unittest.F2B.fast else 30),
|
|
|
|
'timeout': (3 if unittest.F2B.fast else 60),
|
|
|
|
})
|
|
|
|
})
|
|
|
|
self.action = self.jail.actions["badips"]
|
|
|
|
self.action = self.jail.actions["badips"]
|
|
|
|
|
|
|
|
|
|
|
@ -73,6 +95,7 @@ if sys.version_info >= (2,7): # pragma: no cover - may be unavailable
|
|
|
|
self.action._timer.cancel()
|
|
|
|
self.action._timer.cancel()
|
|
|
|
super(BadIPsActionTest, self).tearDown()
|
|
|
|
super(BadIPsActionTest, self).tearDown()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@skip_if_not_available
|
|
|
|
def testCategory(self):
|
|
|
|
def testCategory(self):
|
|
|
|
categories = self.action.getCategories()
|
|
|
|
categories = self.action.getCategories()
|
|
|
|
self.assertIn("ssh", categories)
|
|
|
|
self.assertIn("ssh", categories)
|
|
|
@ -88,17 +111,20 @@ if sys.version_info >= (2,7): # pragma: no cover - may be unavailable
|
|
|
|
# but valid for blacklisting.
|
|
|
|
# but valid for blacklisting.
|
|
|
|
self.action.bancategory = "mail"
|
|
|
|
self.action.bancategory = "mail"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@skip_if_not_available
|
|
|
|
def testScore(self):
|
|
|
|
def testScore(self):
|
|
|
|
self.assertRaises(ValueError, setattr, self.action, "score", -5)
|
|
|
|
self.assertRaises(ValueError, setattr, self.action, "score", -5)
|
|
|
|
self.action.score = 3
|
|
|
|
self.action.score = 3
|
|
|
|
self.action.score = "3"
|
|
|
|
self.action.score = "3"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@skip_if_not_available
|
|
|
|
def testBanaction(self):
|
|
|
|
def testBanaction(self):
|
|
|
|
self.assertRaises(
|
|
|
|
self.assertRaises(
|
|
|
|
ValueError, setattr, self.action, "banaction",
|
|
|
|
ValueError, setattr, self.action, "banaction",
|
|
|
|
"invalid-action")
|
|
|
|
"invalid-action")
|
|
|
|
self.action.banaction = "test"
|
|
|
|
self.action.banaction = "test"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@skip_if_not_available
|
|
|
|
def testUpdateperiod(self):
|
|
|
|
def testUpdateperiod(self):
|
|
|
|
self.assertRaises(
|
|
|
|
self.assertRaises(
|
|
|
|
ValueError, setattr, self.action, "updateperiod", -50)
|
|
|
|
ValueError, setattr, self.action, "updateperiod", -50)
|
|
|
@ -107,6 +133,7 @@ if sys.version_info >= (2,7): # pragma: no cover - may be unavailable
|
|
|
|
self.action.updateperiod = 900
|
|
|
|
self.action.updateperiod = 900
|
|
|
|
self.action.updateperiod = "900"
|
|
|
|
self.action.updateperiod = "900"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@skip_if_not_available
|
|
|
|
def testStartStop(self):
|
|
|
|
def testStartStop(self):
|
|
|
|
self.action.start()
|
|
|
|
self.action.start()
|
|
|
|
self.assertTrue(len(self.action._bannedips) > 10,
|
|
|
|
self.assertTrue(len(self.action._bannedips) > 10,
|
|
|
@ -114,6 +141,7 @@ if sys.version_info >= (2,7): # pragma: no cover - may be unavailable
|
|
|
|
self.action.stop()
|
|
|
|
self.action.stop()
|
|
|
|
self.assertTrue(len(self.action._bannedips) == 0)
|
|
|
|
self.assertTrue(len(self.action._bannedips) == 0)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@skip_if_not_available
|
|
|
|
def testBanIP(self):
|
|
|
|
def testBanIP(self):
|
|
|
|
aInfo = CallingMap({
|
|
|
|
aInfo = CallingMap({
|
|
|
|
'ip': IPAddr('192.0.2.1')
|
|
|
|
'ip': IPAddr('192.0.2.1')
|
|
|
|