mirror of https://github.com/fail2ban/fail2ban
BF: Require Python 2.7+ for badips.py action
parent
5c7630c4be
commit
dfb46cfda6
|
@ -17,11 +17,13 @@
|
|||
# along with Fail2Ban; if not, write to the Free Software
|
||||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
|
||||
import sys
|
||||
if sys.version_info < (2, 7):
|
||||
raise ImportError("badips.py action requires Python >= 2.7")
|
||||
import json
|
||||
from functools import partial
|
||||
import threading
|
||||
import logging
|
||||
import sys
|
||||
if sys.version_info >= (3, ):
|
||||
from urllib.request import Request, urlopen
|
||||
from urllib.parse import urlencode
|
||||
|
|
|
@ -31,7 +31,11 @@ if sys.version_info >= (3, 3):
|
|||
import importlib.machinery
|
||||
else:
|
||||
import imp
|
||||
from collections import Mapping, OrderedDict
|
||||
from collections import Mapping
|
||||
try:
|
||||
from collections import OrderedDict
|
||||
except ImportError:
|
||||
OrderedDict = None
|
||||
|
||||
from .banmanager import BanManager
|
||||
from .jailthread import JailThread
|
||||
|
@ -62,7 +66,10 @@ class Actions(JailThread, Mapping):
|
|||
JailThread.__init__(self)
|
||||
## The jail which contains this action.
|
||||
self._jail = jail
|
||||
self._actions = OrderedDict()
|
||||
if OrderedDict is not None:
|
||||
self._actions = OrderedDict()
|
||||
else:
|
||||
self._actions = dict()
|
||||
## The ban manager.
|
||||
self.__banManager = BanManager()
|
||||
|
||||
|
|
|
@ -32,64 +32,67 @@ if os.path.exists('config/fail2ban.conf'):
|
|||
else:
|
||||
CONFIG_DIR='/etc/fail2ban'
|
||||
|
||||
class BadIPsActionTest(unittest.TestCase):
|
||||
if sys.version_info >= (2,7):
|
||||
class BadIPsActionTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
"""Call before every test case."""
|
||||
self.jail = DummyJail()
|
||||
def setUp(self):
|
||||
"""Call before every test case."""
|
||||
self.jail = DummyJail()
|
||||
|
||||
self.jail.actions.add("test")
|
||||
self.jail.actions.add("test")
|
||||
|
||||
pythonModule = os.path.join(CONFIG_DIR, "action.d", "badips.py")
|
||||
self.jail.actions.add("badips", pythonModule, initOpts={
|
||||
'category': "ssh",
|
||||
'banaction': "test",
|
||||
})
|
||||
self.action = self.jail.actions["badips"]
|
||||
pythonModule = os.path.join(CONFIG_DIR, "action.d", "badips.py")
|
||||
self.jail.actions.add("badips", pythonModule, initOpts={
|
||||
'category': "ssh",
|
||||
'banaction': "test",
|
||||
})
|
||||
self.action = self.jail.actions["badips"]
|
||||
|
||||
def tearDown(self):
|
||||
"""Call after every test case."""
|
||||
# Must cancel timer!
|
||||
if self.action._timer:
|
||||
self.action._timer.cancel()
|
||||
def tearDown(self):
|
||||
"""Call after every test case."""
|
||||
# Must cancel timer!
|
||||
if self.action._timer:
|
||||
self.action._timer.cancel()
|
||||
|
||||
def testCategory(self):
|
||||
categories = self.action.getCategories()
|
||||
self.assertTrue("ssh" in categories)
|
||||
self.assertTrue(len(categories) >= 10)
|
||||
def testCategory(self):
|
||||
categories = self.action.getCategories()
|
||||
self.assertTrue("ssh" in categories)
|
||||
self.assertTrue(len(categories) >= 10)
|
||||
|
||||
self.assertRaises(
|
||||
ValueError, setattr, self.action, "category", "invalid-category")
|
||||
self.assertRaises(
|
||||
ValueError, setattr, self.action, "category",
|
||||
"invalid-category")
|
||||
|
||||
# Not valid for reporting category...
|
||||
self.assertRaises(
|
||||
ValueError, setattr, self.action, "category", "mail")
|
||||
# but valid for blacklisting.
|
||||
self.action.bancategory = "mail"
|
||||
# Not valid for reporting category...
|
||||
self.assertRaises(
|
||||
ValueError, setattr, self.action, "category", "mail")
|
||||
# but valid for blacklisting.
|
||||
self.action.bancategory = "mail"
|
||||
|
||||
def testScore(self):
|
||||
self.assertRaises(ValueError, setattr, self.action, "score", -5)
|
||||
self.action.score = 5
|
||||
self.action.score = "5"
|
||||
def testScore(self):
|
||||
self.assertRaises(ValueError, setattr, self.action, "score", -5)
|
||||
self.action.score = 5
|
||||
self.action.score = "5"
|
||||
|
||||
def testBanaction(self):
|
||||
self.assertRaises(
|
||||
ValueError, setattr, self.action, "banaction", "invalid-action")
|
||||
self.action.banaction = "test"
|
||||
def testBanaction(self):
|
||||
self.assertRaises(
|
||||
ValueError, setattr, self.action, "banaction",
|
||||
"invalid-action")
|
||||
self.action.banaction = "test"
|
||||
|
||||
def testUpdateperiod(self):
|
||||
self.assertRaises(
|
||||
ValueError, setattr, self.action, "updateperiod", -50)
|
||||
self.assertRaises(
|
||||
ValueError, setattr, self.action, "updateperiod", 0)
|
||||
self.action.updateperiod = 900
|
||||
self.action.updateperiod = "900"
|
||||
def testUpdateperiod(self):
|
||||
self.assertRaises(
|
||||
ValueError, setattr, self.action, "updateperiod", -50)
|
||||
self.assertRaises(
|
||||
ValueError, setattr, self.action, "updateperiod", 0)
|
||||
self.action.updateperiod = 900
|
||||
self.action.updateperiod = "900"
|
||||
|
||||
def testStart(self):
|
||||
self.action.start()
|
||||
self.assertTrue(len(self.action._bannedips) > 10)
|
||||
def testStart(self):
|
||||
self.action.start()
|
||||
self.assertTrue(len(self.action._bannedips) > 10)
|
||||
|
||||
def testStop(self):
|
||||
self.testStart()
|
||||
self.action.stop()
|
||||
self.assertTrue(len(self.action._bannedips) == 0)
|
||||
def testStop(self):
|
||||
self.testStart()
|
||||
self.action.stop()
|
||||
self.assertTrue(len(self.action._bannedips) == 0)
|
||||
|
|
Loading…
Reference in New Issue