Merge branch '0.10' into 0.11

pull/2093/head
sebres 2018-02-26 19:30:54 +01:00
commit 47a7f83a0b
4 changed files with 40 additions and 16 deletions

View File

@ -63,6 +63,7 @@ ver. 0.10.3-dev-1 (20??/??/??) - development edition
### Fixes
* `filter.d/exim.conf`: failregex extended - SMTP call dropped: too many syntax or protocol errors (gh-2048);
* `action.d/badips.py`: implicit convert IPAddr to str, solves an issue "expected string, IPAddr found" (gh-2059);
### New Features
@ -71,6 +72,7 @@ ver. 0.10.3-dev-1 (20??/??/??) - development edition
* possibility to specify own regex-pattern to match epoch date-time, e. g. `^\[{EPOCH}\]` or `^\[{LEPOCH}\]` (gh-2038);
the epoch-pattern similar to `{DATE}` patterns does the capture and cuts out the match of whole pattern from the log-line,
e. g. date-pattern `^\[{LEPOCH}\]\s+:` will match and cut out `[1516469849551000] :` from begin of the log-line.
* badips.py now uses https instead of plain http when requesting badips.com (gh-2057);
ver. 0.10.2 (2018/01/18) - nothing-burns-like-the-cold

View File

@ -81,7 +81,7 @@ class BadIPsAction(ActionBase): # pragma: no cover - may be unavailable
"""
TIMEOUT = 10
_badips = "http://www.badips.com"
_badips = "https://www.badips.com"
def _Request(self, url, **argv):
return Request(url, headers={'User-Agent': self.agent}, **argv)
@ -186,6 +186,7 @@ class BadIPsAction(ActionBase): # pragma: no cover - may be unavailable
urlencode({'age': age})])
if key:
url = "&".join([url, urlencode({'key': key})])
self._logSys.debug('badips.com: get list, url: %r', url)
response = urlopen(self._Request(url), timeout=self.timeout)
except HTTPError as response:
messages = json.loads(response.read().decode('utf-8'))
@ -368,9 +369,10 @@ class BadIPsAction(ActionBase): # pragma: no cover - may be unavailable
Any issues with badips.com request.
"""
try:
url = "/".join([self._badips, "add", self.category, aInfo['ip']])
url = "/".join([self._badips, "add", self.category, str(aInfo['ip'])])
if self.key:
url = "?".join([url, urlencode({'key': self.key})])
self._logSys.debug('badips.com: ban, url: %r', url)
response = urlopen(self._Request(url), timeout=self.timeout)
except HTTPError as response:
messages = json.loads(response.read().decode('utf-8'))

View File

@ -21,15 +21,18 @@ import os
import unittest
import sys
from ..actiontestcase import CallingMap
from ..dummyjail import DummyJail
from ..utils import CONFIG_DIR
from ..servertestcase import IPAddr
from ..utils import LogCaptureTestCase, CONFIG_DIR
if sys.version_info >= (2,7): # pragma: no cover - may be unavailable
class BadIPsActionTest(unittest.TestCase):
class BadIPsActionTest(LogCaptureTestCase):
available = True, None
pythonModule = None
modAction = None
def setUp(self):
"""Call before every test case."""
super(BadIPsActionTest, self).setUp()
@ -39,19 +42,25 @@ if sys.version_info >= (2,7): # pragma: no cover - may be unavailable
self.jail.actions.add("test")
pythonModule = os.path.join(CONFIG_DIR, "action.d", "badips.py")
pythonModuleName = os.path.join(CONFIG_DIR, "action.d", "badips.py")
# check availability (once if not alive, used shorter timeout as in test cases):
if BadIPsActionTest.available[0]:
if not BadIPsActionTest.modAction:
BadIPsActionTest.modAction = self.jail.actions._load_python_module(pythonModule).Action
if not BadIPsActionTest.pythonModule:
BadIPsActionTest.pythonModule = self.jail.actions._load_python_module(pythonModuleName)
BadIPsActionTest.modAction = BadIPsActionTest.pythonModule.Action
self.jail.actions._load_python_module(pythonModuleName)
BadIPsActionTest.available = BadIPsActionTest.modAction.isAvailable(timeout=2 if unittest.F2B.fast else 10)
if not BadIPsActionTest.available[0]:
raise unittest.SkipTest('Skip test because service is not available: %s' % BadIPsActionTest.available[1])
self.jail.actions.add("badips", pythonModule, initOpts={
self.jail.actions.add("badips", pythonModuleName, initOpts={
'category': "ssh",
'banaction': "test",
'score': 5,
'key': "fail2ban-test-suite",
#'bankey': "fail2ban-test-suite",
'timeout': (3 if unittest.F2B.fast else 30),
})
self.action = self.jail.actions["badips"]
@ -80,8 +89,8 @@ if sys.version_info >= (2,7): # pragma: no cover - may be unavailable
def testScore(self):
self.assertRaises(ValueError, setattr, self.action, "score", -5)
self.action.score = 5
self.action.score = "5"
self.action.score = 3
self.action.score = "3"
def testBanaction(self):
self.assertRaises(
@ -97,11 +106,22 @@ if sys.version_info >= (2,7): # pragma: no cover - may be unavailable
self.action.updateperiod = 900
self.action.updateperiod = "900"
def testStart(self):
def testStartStop(self):
self.action.start()
self.assertTrue(len(self.action._bannedips) > 10)
def testStop(self):
self.testStart()
self.assertTrue(len(self.action._bannedips) > 10,
"%s is fewer as 10: %r" % (len(self.action._bannedips), self.action._bannedips))
self.action.stop()
self.assertTrue(len(self.action._bannedips) == 0)
def testBanIP(self):
aInfo = CallingMap({
'ip': IPAddr('192.0.2.1')
})
self.action.ban(aInfo)
self.assertLogged('badips.com: ban', wait=True)
self.pruneLog()
# produce an error using wrong category/IP:
self.action._category = 'f2b-this-category-dont-available-test-suite-only'
aInfo['ip'] = ''
self.assertRaises(BadIPsActionTest.pythonModule.HTTPError, self.action.ban, aInfo)
self.assertLogged('IP is invalid', 'invalid category', wait=True, all=False)

View File

@ -181,7 +181,7 @@ class StatusExtendedCymruInfo(unittest.TestCase):
if tc.available[0]:
cymru_info = self.__banManager.getBanListExtendedCymruInfo(
timeout=(2 if unittest.F2B.fast else 20))
else:
else: # pragma: no cover - availability (once after error case only)
cymru_info = tc.available[1]
if cymru_info.get("error"): # pragma: no cover - availability
tc.available = False, cymru_info