mirror of https://github.com/fail2ban/fail2ban
drop support for python 2.6 (hardly possible in modern CIs, new features would expect OrderedDicts, etc)
@ -33,7 +33,8 @@ install:
# coverage
- travis_retry pip install coverage
# coveralls (note coveralls doesn't support 2.6 now):
- if [[ $TRAVIS_PYTHON_VERSION != 2.6* ]]; then F2B_COV=1; else F2B_COV=0; fi
#- if [[ $TRAVIS_PYTHON_VERSION != 2.6* ]]; then F2B_COV=1; else F2B_COV=0; fi
- F2B_COV=1
- if [[ "$F2B_COV" = 1 ]]; then travis_retry pip install coveralls; fi
# codecov:
- travis_retry pip install codecov
@ -10,6 +10,8 @@ ver. 1.0.1-dev-1 (20??/??/??) - development nightly edition
### Compatibility:
* the minimum supported python version is now 2.7, if you have previous python version
you can use the 0.11 version of fail2ban or upgrade python (or even build it from source).
* potential incompatibility by parsing of options of `backend`, `filter` and `action` parameters (if they
are partially incorrect), because fail2ban could throw an error now (doesn't silently bypass it anymore).
* to v.0.11:
@ -33,7 +33,7 @@ Installation:
this case, you should use that instead.**
- [Python2 >= 2.6 or Python >= 3.2](https://www.python.org) or [PyPy](https://pypy.org)
- [Python2 >= 2.7 or Python >= 3.2](https://www.python.org) or [PyPy](https://pypy.org)
- python-setuptools, python-distutils or python3-setuptools for installation from source
@ -32,10 +32,7 @@ try:
from collections.abc import Mapping
except ImportError:
from collections import Mapping
from collections import OrderedDict
except ImportError:
OrderedDict = dict
from collections import OrderedDict
from .banmanager import BanManager, BanTicket
from .ipdns import IPAddr
@ -728,9 +728,7 @@ class Server:
except (ValueError, KeyError): # pragma: no cover
# Is known to be thrown after logging was shutdown once
# with older Pythons -- seems to be safe to ignore there
# At least it was still failing on 2.6.2-0ubuntu1 (jaunty)
if (2, 6, 3) <= sys.version_info < (3,) or \
(3, 2) <= sys.version_info:
if sys.version_info < (3,) or sys.version_info >= (3, 2):
# detailed format by deep log levels (as DEBUG=10):
if logger.getEffectiveLevel() <= logging.DEBUG: # pragma: no cover
@ -30,11 +30,7 @@ import sys
from threading import Lock
import time
from ..helpers import getLogger, _merge_dicts, uni_decode
from collections import OrderedDict
except ImportError: # pragma: 3.x no cover
OrderedDict = dict
from collections import OrderedDict
if sys.version_info >= (3, 3):
import importlib.machinery
@ -100,24 +96,12 @@ class Utils():
with self.__lock:
# clean cache if max count reached:
if len(cache) >= self.maxCount:
if OrderedDict is not dict:
# ordered (so remove some from ahead, FIFO)
while cache:
(ck, cv) = cache.popitem(last=False)
# if not yet expired (but has free slot for new entry):
if cv[1] > t and len(cache) < self.maxCount:
else: # pragma: 3.x no cover (dict is in 2.6 only)
remlst = []
for (ck, cv) in cache.iteritems():
# if expired:
if cv[1] <= t:
for ck in remlst:
self._cache.pop(ck, None)
# if still max count - remove any one:
while cache and len(cache) >= self.maxCount:
# ordered (so remove some from ahead, FIFO)
while cache:
(ck, cv) = cache.popitem(last=False)
# if not yet expired (but has free slot for new entry):
if cv[1] > t and len(cache) < self.maxCount:
# set now:
cache[k] = (v, t + self.maxTime)
@ -75,61 +75,59 @@ class CommandActionTest(LogCaptureTestCase):
lambda: substituteRecursiveTags({'A': 'to=<B> fromip=<IP>', 'C': '<B>', 'B': '<C>', 'D': ''}))
lambda: substituteRecursiveTags({'failregex': 'to=<honeypot> fromip=<IP>', 'sweet': '<honeypot>', 'honeypot': '<sweet>', 'ignoreregex': ''}))
# We need here an ordered, because the sequence of iteration is very important for this test
if OrderedDict:
# No cyclic recursion, just multiple replacement of tag <T>, should be successful:
self.assertEqual(substituteRecursiveTags( OrderedDict(
(('X', 'x=x<T>'), ('T', '1'), ('Z', '<X> <T> <Y>'), ('Y', 'y=y<T>')))
), {'X': 'x=x1', 'T': '1', 'Y': 'y=y1', 'Z': 'x=x1 1 y=y1'}
# No cyclic recursion, just multiple replacement of tag <T> in composite tags, should be successful:
self.assertEqual(substituteRecursiveTags( OrderedDict(
(('X', 'x=x<T> <Z> <<R1>> <<R2>>'), ('R1', 'Z'), ('R2', 'Y'), ('T', '1'), ('Z', '<T> <Y>'), ('Y', 'y=y<T>')))
), {'X': 'x=x1 1 y=y1 1 y=y1 y=y1', 'R1': 'Z', 'R2': 'Y', 'T': '1', 'Z': '1 y=y1', 'Y': 'y=y1'}
# No cyclic recursion, just multiple replacement of same tags, should be successful:
self.assertEqual(substituteRecursiveTags( OrderedDict((
('actionstart', 'ipset create <ipmset> hash:ip timeout <bantime> family <ipsetfamily>\n<iptables> -I <chain> <actiontype>'),
('ipmset', 'f2b-<name>'),
('name', 'any'),
('bantime', '600'),
('ipsetfamily', 'inet'),
('iptables', 'iptables <lockingopt>'),
('lockingopt', '-w'),
('chain', 'INPUT'),
('actiontype', '<multiport>'),
('multiport', '-p <protocol> -m multiport --dports <port> -m set --match-set <ipmset> src -j <blocktype>'),
('protocol', 'tcp'),
('port', 'ssh'),
('blocktype', 'REJECT',),
), OrderedDict((
('actionstart', 'ipset create f2b-any hash:ip timeout 600 family inet\niptables -w -I INPUT -p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
('ipmset', 'f2b-any'),
('name', 'any'),
('bantime', '600'),
('ipsetfamily', 'inet'),
('iptables', 'iptables -w'),
('lockingopt', '-w'),
('chain', 'INPUT'),
('actiontype', '-p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
('multiport', '-p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
('protocol', 'tcp'),
('port', 'ssh'),
('blocktype', 'REJECT')
# Cyclic recursion by composite tag creation, tags "create" another tag, that closes cycle:
self.assertRaises(ValueError, lambda: substituteRecursiveTags( OrderedDict((
('A', '<<B><C>>'),
('B', 'D'), ('C', 'E'),
('DE', 'cycle <A>'),
)) ))
self.assertRaises(ValueError, lambda: substituteRecursiveTags( OrderedDict((
('DE', 'cycle <A>'),
('A', '<<B><C>>'),
('B', 'D'), ('C', 'E'),
)) ))
# No cyclic recursion, just multiple replacement of tag <T>, should be successful:
self.assertEqual(substituteRecursiveTags( OrderedDict(
(('X', 'x=x<T>'), ('T', '1'), ('Z', '<X> <T> <Y>'), ('Y', 'y=y<T>')))
), {'X': 'x=x1', 'T': '1', 'Y': 'y=y1', 'Z': 'x=x1 1 y=y1'}
# No cyclic recursion, just multiple replacement of tag <T> in composite tags, should be successful:
self.assertEqual(substituteRecursiveTags( OrderedDict(
(('X', 'x=x<T> <Z> <<R1>> <<R2>>'), ('R1', 'Z'), ('R2', 'Y'), ('T', '1'), ('Z', '<T> <Y>'), ('Y', 'y=y<T>')))
), {'X': 'x=x1 1 y=y1 1 y=y1 y=y1', 'R1': 'Z', 'R2': 'Y', 'T': '1', 'Z': '1 y=y1', 'Y': 'y=y1'}
# No cyclic recursion, just multiple replacement of same tags, should be successful:
self.assertEqual(substituteRecursiveTags( OrderedDict((
('actionstart', 'ipset create <ipmset> hash:ip timeout <bantime> family <ipsetfamily>\n<iptables> -I <chain> <actiontype>'),
('ipmset', 'f2b-<name>'),
('name', 'any'),
('bantime', '600'),
('ipsetfamily', 'inet'),
('iptables', 'iptables <lockingopt>'),
('lockingopt', '-w'),
('chain', 'INPUT'),
('actiontype', '<multiport>'),
('multiport', '-p <protocol> -m multiport --dports <port> -m set --match-set <ipmset> src -j <blocktype>'),
('protocol', 'tcp'),
('port', 'ssh'),
('blocktype', 'REJECT',),
), OrderedDict((
('actionstart', 'ipset create f2b-any hash:ip timeout 600 family inet\niptables -w -I INPUT -p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
('ipmset', 'f2b-any'),
('name', 'any'),
('bantime', '600'),
('ipsetfamily', 'inet'),
('iptables', 'iptables -w'),
('lockingopt', '-w'),
('chain', 'INPUT'),
('actiontype', '-p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
('multiport', '-p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
('protocol', 'tcp'),
('port', 'ssh'),
('blocktype', 'REJECT')
# Cyclic recursion by composite tag creation, tags "create" another tag, that closes cycle:
self.assertRaises(ValueError, lambda: substituteRecursiveTags( OrderedDict((
('A', '<<B><C>>'),
('B', 'D'), ('C', 'E'),
('DE', 'cycle <A>'),
)) ))
self.assertRaises(ValueError, lambda: substituteRecursiveTags( OrderedDict((
('DE', 'cycle <A>'),
('A', '<<B><C>>'),
('B', 'D'), ('C', 'E'),
)) ))
# missing tags are ok
self.assertEqual(substituteRecursiveTags({'A': '<C>'}), {'A': '<C>'})
@ -70,16 +70,10 @@ class HelpersTest(unittest.TestCase):
self.assertEqual(splitwords(u' 1\n 2, 3'), ['1', '2', '3'])
if sys.version_info >= (2,7):
def _sh_call(cmd):
import subprocess
ret = subprocess.check_output(cmd, shell=True)
return uni_decode(ret).rstrip()
def _sh_call(cmd):
import subprocess
ret = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE).stdout.read()
return uni_decode(ret).rstrip()
def _sh_call(cmd):
import subprocess
ret = subprocess.check_output(cmd, shell=True)
return uni_decode(ret).rstrip()
def _getSysPythonVersion():
return _sh_call("fail2ban-python -c 'import sys; print(tuple(sys.version_info))'")
@ -92,7 +86,7 @@ class SetupTest(unittest.TestCase):
setup = os.path.join(os.path.dirname(__file__), '..', '..', 'setup.py')
self.setup = os.path.exists(setup) and setup or None
if not self.setup and sys.version_info >= (2,7): # pragma: no cover - running not out of the source
if not self.setup: # pragma: no cover - running not out of the source
raise unittest.SkipTest(
"Seems to be running not out of source distribution"
" -- cannot locate setup.py")
@ -181,7 +181,7 @@ class BanTimeIncrDB(LogCaptureTestCase):
def setUp(self):
"""Call before every test case."""
super(BanTimeIncrDB, self).setUp()
if Fail2BanDb is None and sys.version_info >= (2,7): # pragma: no cover
if Fail2BanDb is None: # pragma: no cover
raise unittest.SkipTest(
"Unable to import fail2ban database module as sqlite is not "
@ -775,27 +775,11 @@ class Transmitter(TransmitterBase):
def testPythonActionMethodsAndProperties(self):
action = "TestCaseAction"
out = self.transm.proceed(
["set", self.jailName, "addaction", action,
os.path.join(TEST_FILES_DIR, "action.d", "action.py"),
'{"opt1": "value"}'])
self.assertEqual(out, (0, action))
except AssertionError:
if ((2, 6) <= sys.version_info < (2, 6, 5)) \
and '__init__() keywords must be strings' in out[1]:
# known issue http://bugs.python.org/issue2646 in 2.6 series
# since general Fail2Ban warnings are suppressed in normal
# operation -- let's issue Python's native warning here
import warnings
"Your version of Python %s seems to experience a known "
"issue forbidding correct operation of Fail2Ban: "
"http://bugs.python.org/issue2646 Upgrade your Python and "
"meanwhile other intestPythonActionMethodsAndProperties will "
"be skipped" % (sys.version))
out = self.transm.proceed(
["set", self.jailName, "addaction", action,
os.path.join(TEST_FILES_DIR, "action.d", "action.py"),
'{"opt1": "value"}'])
self.assertEqual(out, (0, action))
self.transm.proceed(["get", self.jailName,
"actionproperties", action])[1],
Reference in New Issue